import re import requests import time import threading from typing import Dict, List, Optional, Any from functools import wraps from dataclasses import dataclass from utils.logging_config import get_logger from core.metadata.cache import get_metadata_cache logger = get_logger("deezer_client") # Global rate limiting variables _last_api_call_time = 0 _api_call_lock = threading.Lock() MIN_API_INTERVAL = 1.0 # 1 second between API calls (Deezer soft limit: 50 req/5s) def rate_limited(func): """Decorator to enforce rate limiting on Deezer API calls""" @wraps(func) def wrapper(*args, **kwargs): global _last_api_call_time with _api_call_lock: current_time = time.time() time_since_last_call = current_time - _last_api_call_time if time_since_last_call < MIN_API_INTERVAL: sleep_time = MIN_API_INTERVAL - time_since_last_call time.sleep(sleep_time) _last_api_call_time = time.time() from core.api_call_tracker import api_call_tracker api_call_tracker.record_call('deezer') try: result = func(*args, **kwargs) return result except Exception as e: if "rate limit" in str(e).lower() or "429" in str(e): logger.warning(f"Deezer rate limit hit, implementing backoff: {e}") time.sleep(4.0) raise e return wrapper # Pattern matches Deezer's CDN cover/picture URL: a numeric width-x-height # segment in the path (e.g. ``/1000x1000-000000-80-0-0.jpg``). Captures # both halves so the replacement can use a single dimension and preserve # the rest of the path verbatim. _DEEZER_CDN_SIZE_PATTERN = re.compile(r'/(\d+)x(\d+)-') # Maximum size Deezer's CDN serves before returning 403. Verified # empirically against multiple albums — 1900 works reliably, 2000+ # returns Forbidden. CDN serves the source-native size when it's # smaller than requested, so asking for 1900 is safe even on albums # whose source upload was lower-res (no upscaling, just same bytes). _DEEZER_MAX_COVER_SIZE = 1900 def _upgrade_deezer_cover_url(url: str, target_size: int = _DEEZER_MAX_COVER_SIZE) -> str: """Rewrite a Deezer CDN cover/picture URL to request a larger size. Deezer's API returns ``cover_xl`` / ``picture_xl`` URLs at 1000×1000, but the underlying CDN serves up to 1900×1900 by rewriting the size segment in the URL path. This helper does the rewrite — same idea as ``_upgrade_spotify_image_url`` in ``spotify_client`` and the ``mzstatic.com`` size-replacement in ``download_cover_art``. Defensive on every input shape: - Empty / None URL → returned as-is - Non-Deezer URL (no ``dzcdn`` host, no size segment) → returned as-is - Already at or above target size → returned as-is (no point rewriting) The CDN returns the source-native image bytes when source < target, so asking for 1900 on an album whose source was uploaded at 600 just returns the 600-pixel image — no upscaling, no failure. """ if not url or 'dzcdn' not in url: return url match = _DEEZER_CDN_SIZE_PATTERN.search(url) if not match: return url current = int(match.group(1)) if current >= target_size: return url return _DEEZER_CDN_SIZE_PATTERN.sub(f'/{target_size}x{target_size}-', url, count=1) def _is_full_track_payload(payload: Optional[Dict[str, Any]]) -> bool: """Distinguish a full `/track/` cache hit from partial album-tracks data. Three Deezer endpoints feed the per-track cache: - `/track/` — full record, includes both `track_position` AND `contributors` (the multi-artist list the contributors-upgrade path reads). - `/album//tracks` — partial; includes `track_position` but omits `contributors`. - `/search/track` — minimal; lacks `track_position`. Pre-fix `get_track_details` only checked `track_position`, so partial album-tracks payloads were treated as full hits and the contributors-upgrade silently fell back to single-artist tagging whenever an album had been fetched before its individual tracks were post-processed (issue #588). `contributors` key presence is the load-bearing distinction — `[]` is a valid value for genuinely single-artist tracks fetched via the per-track endpoint, so test for key membership not truthiness. """ if not isinstance(payload, dict): return False return 'track_position' in payload and 'contributors' in payload # ==================== Dataclasses (match iTunesClient / SpotifyClient format) ==================== @dataclass class Track: id: str name: str artists: List[str] album: str duration_ms: int popularity: int preview_url: Optional[str] = None external_urls: Optional[Dict[str, str]] = None image_url: Optional[str] = None release_date: Optional[str] = None track_number: Optional[int] = None disc_number: Optional[int] = None album_type: Optional[str] = None total_tracks: Optional[int] = None @classmethod def from_deezer_track(cls, track_data: Dict[str, Any]) -> 'Track': # Extract album image album_data = track_data.get('album', {}) album_image_url = None if isinstance(album_data, dict): album_image_url = album_data.get('cover_xl') or album_data.get('cover_big') or album_data.get('cover_medium') # Get artist name(s) — use contributors for multi-artist tracks (feat. collabs) artist_data = track_data.get('artist', {}) artist_name = artist_data.get('name', 'Unknown Artist') if isinstance(artist_data, dict) else 'Unknown Artist' contributors = track_data.get('contributors', []) if isinstance(contributors, list) and len(contributors) > 1: artist_names = [] for c in contributors: if isinstance(c, dict) and c.get('name'): artist_names.append(c['name']) if artist_names: all_artists = artist_names else: all_artists = [artist_name] else: all_artists = [artist_name] # Get album name album_name = '' if isinstance(album_data, dict): album_name = album_data.get('title', '') elif isinstance(album_data, str): album_name = album_data # Build external URLs external_urls = {} if track_data.get('link'): external_urls['deezer'] = track_data['link'] # Deezer search doesn't return album_type directly; infer if nb_tracks available nb_tracks = album_data.get('nb_tracks') if isinstance(album_data, dict) else None album_type = track_data.get('type') # Deezer sometimes returns 'album'/'single' if not album_type and nb_tracks: if nb_tracks <= 3: album_type = 'single' elif nb_tracks <= 6: album_type = 'ep' else: album_type = 'album' return cls( id=str(track_data.get('id', '')), name=track_data.get('title', ''), artists=all_artists, album=album_name, duration_ms=track_data.get('duration', 0) * 1000, # Deezer returns seconds popularity=track_data.get('rank', 0), preview_url=track_data.get('preview'), external_urls=external_urls if external_urls else None, image_url=album_image_url, release_date=track_data.get('release_date') or (album_data.get('release_date') if isinstance(album_data, dict) else None), track_number=track_data.get('track_position'), disc_number=track_data.get('disk_number', 1), album_type=album_type, total_tracks=nb_tracks, ) @dataclass class Artist: id: str name: str popularity: int genres: List[str] followers: int image_url: Optional[str] = None external_urls: Optional[Dict[str, str]] = None @classmethod def from_deezer_artist(cls, artist_data: Dict[str, Any]) -> 'Artist': image_url = artist_data.get('picture_xl') or artist_data.get('picture_big') or artist_data.get('picture_medium') external_urls = {} if artist_data.get('link'): external_urls['deezer'] = artist_data['link'] return cls( id=str(artist_data.get('id', '')), name=artist_data.get('name', ''), popularity=0, genres=[], followers=artist_data.get('nb_fan', 0), image_url=image_url, external_urls=external_urls if external_urls else None ) @dataclass class Album: id: str name: str artists: List[str] release_date: str total_tracks: int album_type: str image_url: Optional[str] = None external_urls: Optional[Dict[str, str]] = None explicit: Optional[bool] = None @classmethod def from_deezer_album(cls, album_data: Dict[str, Any]) -> 'Album': image_url = album_data.get('cover_xl') or album_data.get('cover_big') or album_data.get('cover_medium') external_urls = {} if album_data.get('link'): external_urls['deezer'] = album_data['link'] artist_data = album_data.get('artist', {}) artist_name = artist_data.get('name', 'Unknown Artist') if isinstance(artist_data, dict) else 'Unknown Artist' # Map Deezer record_type record_type = album_data.get('record_type', 'album') if record_type == 'single': album_type = 'single' elif record_type == 'ep': album_type = 'ep' elif record_type == 'compile': album_type = 'compilation' else: album_type = 'album' return cls( id=str(album_data.get('id', '')), name=album_data.get('title', ''), artists=[artist_name], release_date=album_data.get('release_date', ''), total_tracks=album_data.get('nb_tracks', 0), album_type=album_type, image_url=image_url, external_urls=external_urls if external_urls else None, explicit=bool(album_data.get('explicit_lyrics', False)), ) @dataclass class Playlist: id: str name: str description: Optional[str] owner: str public: bool collaborative: bool tracks: List[Track] total_tracks: int class DeezerClient: """ Deezer API client for music metadata and playlist access. Provides metadata parity with iTunesClient for use as a fallback source. Also provides enrichment methods (search_artist, search_album, search_track) and playlist methods used by deezer_worker.py. Free, no authentication required. Rate limit: ~50 calls/5s. """ BASE_URL = "https://api.deezer.com" def __init__(self): self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'SoulSync/1.0', 'Accept': 'application/json' }) self._access_token = None self._load_token() logger.info("Deezer client initialized" + (" (authenticated)" if self._access_token else " (public)")) def _load_token(self): """Load OAuth access token from config if available.""" try: from config.settings import config_manager self._access_token = config_manager.get('deezer.access_token', None) except Exception: self._access_token = None def is_user_authenticated(self) -> bool: """Check if we have a Deezer OAuth user token (for favorites, playlists, etc.)""" return bool(self._access_token) def is_authenticated(self) -> bool: """Deezer public API requires no authentication — always available""" return True def reload_config(self): """Reload configuration — refresh OAuth token from config.""" self._load_token() def _api_get(self, endpoint: str, params: dict = None, timeout: int = 15) -> Optional[Dict[str, Any]]: """Generic GET request to Deezer API with error handling. Includes OAuth access_token when available for user-level endpoints.""" try: url = f"{self.BASE_URL}/{endpoint.lstrip('/')}" if params is None: params = {} # Include access token for authenticated requests if self._access_token and 'access_token' not in params: params['access_token'] = self._access_token response = self.session.get(url, params=params, timeout=timeout) if response.status_code != 200: logger.error(f"Deezer API returned status {response.status_code} for {endpoint}") return None data = response.json() if 'error' in data: error = data['error'] error_type = error.get('type', 'Unknown') error_msg = error.get('message', 'Unknown error') if error_type == 'DataException': logger.debug(f"Deezer data not found: {endpoint}") else: logger.error(f"Deezer API error ({error_type}): {error_msg}") return None return data except Exception as e: logger.error(f"Error in Deezer API request ({endpoint}): {e}") return None # ==================== Metadata Source Methods (iTunesClient parity) ==================== # These methods follow the same interface as iTunesClient so DeezerClient # can serve as a drop-in fallback metadata source in SpotifyClient. @rate_limited def search_tracks( self, query: str = '', limit: int = 20, *, track: Optional[str] = None, artist: Optional[str] = None, album: Optional[str] = None, ) -> List[Track]: """Search for tracks — returns Track dataclass list (metadata source interface). Two call modes: 1. **Free-text** (`query='Foreigner Dirty White Boy'`) — legacy shape, passes the string straight to Deezer's `q` param. Same behaviour as before, kept for backward compat. 2. **Field-scoped** (`track='Dirty White Boy', artist='Foreigner'`) — builds Deezer's advanced search syntax (`track:"X" artist:"Y"`). Massively tighter relevance than the free-text path because the API matches each term in the right field instead of anywhere across title / lyrics / artist / album / contributors. Without this, the Deezer ranking buries the canonical track under karaoke / cover / "originally performed by" variants — see issue #534. Field-scoped form is used whenever ``track`` or ``artist`` is provided. ``query`` is ignored in that case (the field params are authoritative). When both are missing, falls through to ``query``. The cache key is the constructed query string in either case so the two paths share entries naturally. """ # Build the actual API query — advanced syntax when callers pass # field hints, raw query otherwise. used_advanced = bool(track or artist or album) if used_advanced: api_query = self._build_advanced_query(track=track, artist=artist, album=album) else: api_query = query if not api_query: return [] tracks = self._search_tracks_with_query(api_query, limit) # Safety net: Deezer's advanced syntax is `artist:"X"`-style # substring match, but in practice it's brittle on artist name # variants ("Foreigner [US]", "The Foreigner", etc.) and on # tracks indexed under non-canonical title spellings. When the # advanced query returns nothing, fall back to a free-text join # so the user sees the prior (less-relevant but non-empty) result # set rather than "No matches" — same behaviour as pre-fix for # this edge case. Caller-side rerank still tightens the result. if not tracks and used_advanced: fallback_parts = [p for p in (track, artist, album) if p] fallback_query = ' '.join(fallback_parts) if fallback_query and fallback_query != api_query: logger.debug( "[Deezer] Advanced query returned 0 results, falling back " "to free-text: %r → %r", api_query, fallback_query, ) tracks = self._search_tracks_with_query(fallback_query, limit) return tracks def _search_tracks_with_query(self, api_query: str, limit: int) -> List[Track]: """Cache-aware single API call. Pulled out so the ``search_tracks`` orchestration can call this twice (advanced query → free-text fallback) without duplicating the cache + parse + store dance.""" cache = get_metadata_cache() cached_results = cache.get_search_results('deezer', 'track', api_query, limit) if cached_results is not None: tracks = [] for raw in cached_results: try: tracks.append(Track.from_deezer_track(raw)) except Exception as e: logger.debug("Track.from_deezer_track cache parse: %s", e) if tracks: return tracks data = self._api_get('search/track', {'q': api_query, 'limit': min(limit, 100)}) if not data or 'data' not in data: return [] tracks = [] raw_items = [] for track_data in data['data']: track_obj = Track.from_deezer_track(track_data) tracks.append(track_obj) raw_items.append(track_data) entries = [(str(td.get('id', '')), td) for td in raw_items if td.get('id')] if entries: cache.store_entities_bulk('deezer', 'track', entries) cache.store_search_results('deezer', 'track', api_query, limit, [str(td.get('id', '')) for td in raw_items if td.get('id')]) return tracks @staticmethod def _build_advanced_query( *, track: Optional[str] = None, artist: Optional[str] = None, album: Optional[str] = None, ) -> str: """Compose Deezer's advanced search syntax from field hints. Per Deezer's docs: https://developers.deezer.com/api/search q=track:"X" artist:"Y" album:"Z" Quotes around each value preserve multi-word phrases. Empty fields are skipped. Embedded double-quotes get stripped (no escape mechanism in Deezer's syntax) — rare in practice, but a search for `O"Hara` would otherwise produce a malformed query. """ parts = [] if track: parts.append(f'track:"{track.replace(chr(34), "")}"') if artist: parts.append(f'artist:"{artist.replace(chr(34), "")}"') if album: parts.append(f'album:"{album.replace(chr(34), "")}"') return ' '.join(parts) @rate_limited def search_artists(self, query: str, limit: int = 20) -> List[Artist]: """Search for artists — returns Artist dataclass list (metadata source interface)""" cache = get_metadata_cache() cached_results = cache.get_search_results('deezer', 'artist', query, limit) if cached_results is not None: artists = [] for raw in cached_results: try: artists.append(Artist.from_deezer_artist(raw)) except Exception as e: logger.debug("Artist.from_deezer_artist cache parse: %s", e) if artists: return artists data = self._api_get('search/artist', {'q': query, 'limit': min(limit, 100)}) if not data or 'data' not in data: return [] artists = [] raw_items = [] for artist_data in data['data']: artist = Artist.from_deezer_artist(artist_data) artists.append(artist) raw_items.append(artist_data) entries = [(str(ad.get('id', '')), ad) for ad in raw_items if ad.get('id')] if entries: cache.store_entities_bulk('deezer', 'artist', entries) cache.store_search_results('deezer', 'artist', query, limit, [str(ad.get('id', '')) for ad in raw_items if ad.get('id')]) return artists @rate_limited def search_albums(self, query: str, limit: int = 20) -> List[Album]: """Search for albums — returns Album dataclass list (metadata source interface)""" cache = get_metadata_cache() cached_results = cache.get_search_results('deezer', 'album', query, limit) if cached_results is not None: albums = [] for raw in cached_results: try: albums.append(Album.from_deezer_album(raw)) except Exception as e: logger.debug("Album.from_deezer_album cache parse: %s", e) if albums: return albums data = self._api_get('search/album', {'q': query, 'limit': min(limit, 100)}) if not data or 'data' not in data: return [] albums = [] raw_items = [] for album_data in data['data']: album = Album.from_deezer_album(album_data) albums.append(album) raw_items.append(album_data) entries = [(str(ad.get('id', '')), ad) for ad in raw_items if ad.get('id')] if entries: cache.store_entities_bulk('deezer', 'album', entries, skip_if_exists=True) cache.store_search_results('deezer', 'album', query, limit, [str(ad.get('id', '')) for ad in raw_items if ad.get('id')]) return albums[:limit] def get_track_details(self, track_id: str) -> Optional[Dict[str, Any]]: """Get detailed track info — returns Spotify-compatible dict (metadata source interface)""" cache = get_metadata_cache() cached = cache.get_entity('deezer', 'track', str(track_id)) if cached and cached.get('title') and _is_full_track_payload(cached): return self._build_enhanced_track(cached) # Otherwise fall through to fetch full data from API data = self._api_get(f'track/{track_id}') if not data: return None cache.store_entity('deezer', 'track', str(track_id), data) return self._build_enhanced_track(data) def _build_enhanced_track(self, track_data: Dict[str, Any]) -> Dict[str, Any]: """Build Spotify-compatible enhanced track dict from raw Deezer data""" artist_data = track_data.get('artist', {}) album_data = track_data.get('album', {}) artist_name = artist_data.get('name', 'Unknown Artist') if isinstance(artist_data, dict) else 'Unknown Artist' album_name = album_data.get('title', '') if isinstance(album_data, dict) else str(album_data) if album_data else '' album_id = str(album_data.get('id', '')) if isinstance(album_data, dict) else '' # Use contributors for multi-artist tracks contributors = track_data.get('contributors', []) if isinstance(contributors, list) and len(contributors) > 1: all_artists = [c['name'] for c in contributors if isinstance(c, dict) and c.get('name')] if not all_artists: all_artists = [artist_name] else: all_artists = [artist_name] return { 'id': str(track_data.get('id', '')), 'name': track_data.get('title', ''), 'track_number': track_data.get('track_position', 0), 'disc_number': track_data.get('disk_number', 1), 'duration_ms': track_data.get('duration', 0) * 1000, 'explicit': track_data.get('explicit_lyrics', False), 'artists': all_artists, 'primary_artist': artist_name, 'album': { 'id': album_id, 'name': album_name, 'total_tracks': album_data.get('nb_tracks', 0) if isinstance(album_data, dict) else 0, 'release_date': track_data.get('release_date', '') or (album_data.get('release_date', '') if isinstance(album_data, dict) else ''), 'album_type': 'album', 'artists': [artist_name] }, 'is_album_track': (album_data.get('nb_tracks', 0) if isinstance(album_data, dict) else 0) > 1, 'raw_data': track_data } def get_track_features(self, track_id: str) -> Optional[Dict[str, Any]]: """Deezer does not provide audio features like Spotify""" return None def get_album_metadata(self, album_id: str, include_tracks: bool = True) -> Optional[Dict[str, Any]]: """Get album info — returns Spotify-compatible dict (metadata source interface). Matches iTunesClient.get_album() interface. The enrichment method below is get_album_raw() (used by deezer_worker.py).""" cache = get_metadata_cache() cached = cache.get_entity('deezer', 'album', str(album_id)) # Only use cache if it has full album data (release_date indicates full API response, # not just a search result which lacks release_date and track details) if cached and cached.get('title') and cached.get('release_date'): return self._build_album_result(cached, album_id, include_tracks) data = self._api_get(f'album/{album_id}') if not data: # Fall back to cached if API fails if cached and cached.get('title'): return self._build_album_result(cached, album_id, include_tracks) return None cache.store_entity('deezer', 'album', str(album_id), data) return self._build_album_result(data, album_id, include_tracks) def _build_album_result(self, album_data: Dict[str, Any], album_id: str, include_tracks: bool = True) -> Dict[str, Any]: """Build Spotify-compatible album result from Deezer data""" images = [] for size_key, height in [('cover_xl', 1000), ('cover_big', 500), ('cover_medium', 250), ('cover_small', 56)]: if album_data.get(size_key): images.append({'url': album_data[size_key], 'height': height, 'width': height}) artist_data = album_data.get('artist', {}) artist_name = artist_data.get('name', 'Unknown Artist') if isinstance(artist_data, dict) else 'Unknown Artist' artist_id = str(artist_data.get('id', '')) if isinstance(artist_data, dict) else '' record_type = album_data.get('record_type', 'album') if record_type == 'single': album_type = 'single' elif record_type == 'ep': album_type = 'ep' elif record_type == 'compile': album_type = 'compilation' else: album_type = 'album' album_result = { 'id': str(album_data.get('id', album_id)), 'name': album_data.get('title', ''), 'images': images, 'artists': [{'name': artist_name, 'id': artist_id}], 'release_date': album_data.get('release_date', ''), 'total_tracks': album_data.get('nb_tracks', 0), 'album_type': album_type, 'external_urls': {'deezer': album_data.get('link', '')}, 'uri': f"deezer:album:{album_data.get('id', '')}", '_source': 'deezer', '_raw_data': album_data } if include_tracks: tracks_data = self.get_album_tracks(album_id) if tracks_data and 'items' in tracks_data: album_result['tracks'] = tracks_data else: album_result['tracks'] = {'items': [], 'total': 0} return album_result def get_album_tracks(self, album_id: str) -> Optional[Dict[str, Any]]: """Get album tracks — returns Spotify-compatible format (metadata source interface)""" cache = get_metadata_cache() cached = cache.get_entity('deezer', 'album', f"{album_id}_tracks") if cached: return cached data = self._api_get(f'album/{album_id}/tracks', {'limit': 500}) if not data or 'data' not in data: album_data = self._api_get(f'album/{album_id}') if album_data and 'tracks' in album_data and 'data' in album_data['tracks']: data = album_data['tracks'] else: return None # Get album-level info for images and name album_info = self._api_get(f'album/{album_id}') album_images = [] album_name = '' if album_info: album_name = album_info.get('title', '') for size_key, height in [('cover_xl', 1000), ('cover_big', 500), ('cover_medium', 250)]: if album_info.get(size_key): album_images.append({'url': album_info[size_key], 'height': height, 'width': height}) tracks = [] for item in data['data']: artist_data = item.get('artist', {}) artist_name = artist_data.get('name', 'Unknown Artist') if isinstance(artist_data, dict) else 'Unknown Artist' normalized_track = { 'id': str(item.get('id', '')), 'name': item.get('title', ''), 'artists': [{'name': artist_name}], 'album': { 'id': str(album_id), 'name': album_name, 'images': album_images, 'release_date': album_info.get('release_date', '') if album_info else '' }, 'duration_ms': item.get('duration', 0) * 1000, 'track_number': item.get('track_position', 0), 'disc_number': item.get('disk_number', 1), 'explicit': item.get('explicit_lyrics', False), 'preview_url': item.get('preview'), 'uri': f"deezer:track:{item.get('id', '')}", 'external_urls': {'deezer': item.get('link', '')}, '_source': 'deezer' } tracks.append(normalized_track) tracks.sort(key=lambda t: (t.get('disc_number', 1), t.get('track_number', 0))) logger.info(f"Retrieved {len(tracks)} tracks for album {album_id}") result = { 'items': tracks, 'total': len(tracks), 'limit': len(tracks), 'next': None } cache.store_entity('deezer', 'album', f"{album_id}_tracks", result) # Cache individual tracks for item in data['data']: if item.get('id'): cache.store_entity('deezer', 'track', str(item['id']), item) return result def get_artist_top_tracks(self, artist_id: str, limit: int = 10) -> List[Dict[str, Any]]: """Return the artist's top tracks in Spotify-compatible dict format. Wraps Deezer's `/artist/{id}/top?limit=N`. Returns dicts with the same shape Spotify's `artist_top_tracks` produces — id, name, artists, album (with album_type / total_tracks / release_date / images), duration_ms, track_number, disc_number — so callers don't need to branch on source. """ if not artist_id: return [] try: limit = max(1, min(int(limit or 10), 100)) except (TypeError, ValueError): limit = 10 data = self._api_get(f'artist/{artist_id}/top', {'limit': limit}) if not data or 'data' not in data: return [] tracks = [] for track_data in data['data']: if not isinstance(track_data, dict): continue artist_data = track_data.get('artist') or {} album_data = track_data.get('album') or {} # Build images list from any cover sizes Deezer returned for the album images = [] if isinstance(album_data, dict): for size_key, dim in [('cover_xl', 1000), ('cover_big', 500), ('cover_medium', 250), ('cover_small', 56)]: if album_data.get(size_key): images.append({'url': album_data[size_key], 'height': dim, 'width': dim}) # Deezer `/artist/{id}/top` results don't include record_type on the # nested album object; we don't have a track-count to infer from # either. Default 'album' so the path-builder template variable # always has something to substitute (existing behavior elsewhere). album_payload = { 'id': str(album_data.get('id', '')) if isinstance(album_data, dict) else '', 'name': album_data.get('title', '') if isinstance(album_data, dict) else '', 'album_type': 'album', 'images': images, 'release_date': '', 'total_tracks': 0, 'artists': [{'name': artist_data.get('name', '')}] if isinstance(artist_data, dict) else [], } tracks.append({ 'id': str(track_data.get('id', '')), 'name': track_data.get('title', ''), 'artists': [{ 'id': str(artist_data.get('id', '')) if isinstance(artist_data, dict) else '', 'name': artist_data.get('name', '') if isinstance(artist_data, dict) else '', }], 'album': album_payload, 'duration_ms': (track_data.get('duration') or 0) * 1000, # Deezer is seconds 'popularity': track_data.get('rank', 0), 'preview_url': track_data.get('preview'), 'external_urls': {'deezer': track_data['link']} if track_data.get('link') else {}, 'track_number': track_data.get('track_position'), 'disc_number': track_data.get('disk_number', 1), 'explicit': bool(track_data.get('explicit_lyrics', False)), '_source': 'deezer', }) return tracks def get_artist_info(self, artist_id: str) -> Optional[Dict[str, Any]]: """Get full artist details — returns Spotify-compatible dict (metadata source interface). Matches iTunesClient.get_artist() interface.""" cache = get_metadata_cache() cached = cache.get_entity('deezer', 'artist', str(artist_id)) if cached and cached.get('name'): return self._build_artist_result(cached) data = self._api_get(f'artist/{artist_id}') if not data: return None cache.store_entity('deezer', 'artist', str(artist_id), data) return self._build_artist_result(data) def _build_artist_result(self, artist_data: Dict[str, Any]) -> Dict[str, Any]: """Build Spotify-compatible artist result from Deezer data""" images = [] for size_key, height in [('picture_xl', 1000), ('picture_big', 500), ('picture_medium', 250), ('picture_small', 56)]: if artist_data.get(size_key): images.append({'url': artist_data[size_key], 'height': height, 'width': height}) return { 'id': str(artist_data.get('id', '')), 'name': artist_data.get('name', ''), 'images': images, 'genres': [], 'popularity': 0, 'followers': {'total': artist_data.get('nb_fan', 0)}, 'external_urls': {'deezer': artist_data.get('link', '')}, 'uri': f"deezer:artist:{artist_data.get('id', '')}", '_source': 'deezer', '_raw_data': artist_data } def get_artist_albums_list(self, artist_id: str, album_type: str = 'album,single', limit: int = 200) -> List[Album]: """Get albums by artist ID — returns Album dataclass list (metadata source interface). Matches iTunesClient.get_artist_albums() interface. Paginates through all results up to the requested limit.""" albums = [] all_raw = [] requested_types = [t.strip() for t in album_type.split(',')] offset = 0 page_size = 100 # Deezer API max per request while offset < limit: fetch_limit = min(page_size, limit - offset) data = self._api_get(f'artist/{artist_id}/albums', {'limit': fetch_limit, 'index': offset}) if not data or 'data' not in data or len(data['data']) == 0: break for album_data in data['data']: all_raw.append(album_data) album = Album.from_deezer_album(album_data) if album_type != 'album,single': if album.album_type not in requested_types: if not (album.album_type == 'ep' and 'single' in requested_types): continue albums.append(album) if len(data['data']) < fetch_limit: break # Last page offset += len(data['data']) cache = get_metadata_cache() # Deezer's /artist/{id}/albums endpoint doesn't include artist info on each album. # Inject it so cached album entities have artist_name for discover page display. artist_stub = None if albums and albums[0].artists: artist_stub = {'id': int(artist_id) if artist_id.isdigit() else 0, 'name': albums[0].artists[0]} entries = [] for ad in all_raw: if ad.get('id'): if artist_stub and not ad.get('artist'): ad['artist'] = artist_stub entries.append((str(ad['id']), ad)) if entries: cache.store_entities_bulk('deezer', 'album', entries, skip_if_exists=True) logger.info(f"Retrieved {len(albums)} albums for artist {artist_id}") return albums[:limit] # ==================== Interface Aliases (match iTunesClient method names) ==================== # These allow SpotifyClient to call self._fallback.get_album() etc. without # conditional dispatch — same method names as iTunesClient. get_album = get_album_metadata get_artist = get_artist_info get_artist_albums = get_artist_albums_list def _get_artist_image_from_albums(self, artist_id: str) -> Optional[str]: """Compatibility with iTunesClient — Deezer artists have direct image URLs.""" artist_data = self._api_get(f'artist/{artist_id}') if artist_data: return artist_data.get('picture_xl') or artist_data.get('picture_big') or artist_data.get('picture_medium') return None # ==================== User Methods (require OAuth) ==================== @rate_limited def get_user_favorite_artists(self, limit: int = 200) -> list: """Fetch user's favorite artists from Deezer. Requires OAuth access token. Returns list of dicts with deezer_id, name, image_url.""" if not self._access_token: logger.debug("Deezer not user-authenticated — cannot fetch favorites") return [] try: artists = [] index = 0 while len(artists) < limit: data = self._api_get('user/me/artists', params={ 'limit': min(100, limit - len(artists)), 'index': index }) if not data or 'data' not in data: break items = data['data'] if not items: break for a in items: artists.append({ 'deezer_id': str(a.get('id', '')), 'name': a.get('name', ''), 'image_url': a.get('picture_xl') or a.get('picture_big') or a.get('picture_medium', ''), }) if not data.get('next'): break index += len(items) time.sleep(0.3) # Extra breathing room logger.info(f"Retrieved {len(artists)} favorite artists from Deezer") return artists except Exception as e: logger.error(f"Error fetching Deezer favorite artists: {e}") return [] @rate_limited def get_user_favorite_albums(self, limit: int = 200) -> list: """Fetch user's favorite albums from Deezer. Requires OAuth access token. Returns list of dicts with deezer_id, album_name, artist_name, image_url, release_date, total_tracks.""" if not self._access_token: logger.debug("Deezer not user-authenticated — cannot fetch favorite albums") return [] try: albums = [] index = 0 while len(albums) < limit: data = self._api_get('user/me/albums', params={ 'limit': min(100, limit - len(albums)), 'index': index }) if not data or 'data' not in data: break items = data['data'] if not items: break for a in items: artist_name = '' if isinstance(a.get('artist'), dict): artist_name = a['artist'].get('name', '') albums.append({ 'deezer_id': str(a.get('id', '')), 'album_name': a.get('title', ''), 'artist_name': artist_name, 'image_url': a.get('cover_xl') or a.get('cover_big') or a.get('cover_medium', ''), 'release_date': a.get('release_date', ''), 'total_tracks': a.get('nb_tracks', 0), }) if not data.get('next'): break index += len(items) time.sleep(0.3) logger.info(f"Retrieved {len(albums)} favorite albums from Deezer") return albums except Exception as e: logger.error(f"Error fetching Deezer favorite albums: {e}") return [] # ==================== Stub Methods (match iTunesClient interface) ==================== def get_user_playlists(self) -> List[Playlist]: """Not supported — Deezer playlists require auth""" return [] def get_user_playlists_metadata_only(self) -> List[Playlist]: """Not supported""" return [] def get_saved_tracks_count(self) -> int: """Not supported""" return 0 def get_saved_tracks(self) -> List[Track]: """Not supported""" return [] def get_playlist_by_id(self, playlist_id: str) -> Optional[Playlist]: """Not supported""" return None def get_user_info(self) -> Optional[Dict[str, Any]]: """Not supported — requires auth""" return None # ==================== Existing Enrichment Methods ==================== # These methods are used by deezer_worker.py and web_server.py enrichment endpoints. # They have different signatures from the metadata-source methods above. @rate_limited def search_artist(self, artist_name: str) -> Optional[Dict[str, Any]]: """ Search for an artist by name (enrichment interface). Args: artist_name: Name of the artist to search for Returns: Artist dict from Deezer or None if not found """ try: response = self.session.get( f"{self.BASE_URL}/search/artist", params={'q': artist_name, 'strict': 'on'}, timeout=10 ) response.raise_for_status() data = response.json() if 'error' in data: logger.error(f"Deezer API error searching artist '{artist_name}': {data['error']}") return None results = data.get('data', []) if results and len(results) > 0: result = results[0] # Cache the artist entity try: cache = get_metadata_cache() cache.store_entity('deezer', 'artist', str(result.get('id', '')), result) except Exception as e: logger.debug("cache store_entity artist search: %s", e) logger.debug(f"Found artist for query: {artist_name}") return result logger.debug(f"No artist found for query: {artist_name}") return None except Exception as e: logger.error(f"Error searching for artist '{artist_name}': {e}") return None @rate_limited def search_album(self, artist_name: str, album_title: str) -> Optional[Dict[str, Any]]: """ Search for an album by artist name and album title (enrichment interface). Args: artist_name: Name of the artist album_title: Title of the album Returns: Album dict from Deezer or None if not found """ try: query = f"{artist_name} {album_title}" response = self.session.get( f"{self.BASE_URL}/search/album", params={'q': query}, timeout=10 ) response.raise_for_status() data = response.json() if 'error' in data: logger.error(f"Deezer API error searching album '{query}': {data['error']}") return None results = data.get('data', []) if results and len(results) > 0: result = results[0] # Cache the album entity try: cache = get_metadata_cache() cache.store_entity('deezer', 'album', str(result.get('id', '')), result) except Exception as e: logger.debug("cache store_entity album search: %s", e) logger.debug(f"Found album for query: {artist_name} - {album_title}") return result logger.debug(f"No album found for query: {artist_name} - {album_title}") return None except Exception as e: logger.error(f"Error searching for album '{artist_name} - {album_title}': {e}") return None @rate_limited def search_track(self, artist_name: str, track_title: str) -> Optional[Dict[str, Any]]: """ Search for a track by artist name and track title (enrichment interface). Args: artist_name: Name of the artist track_title: Title of the track Returns: Track dict from Deezer or None if not found """ try: query = f'artist:"{artist_name}" track:"{track_title}"' response = self.session.get( f"{self.BASE_URL}/search", params={'q': query}, timeout=10 ) response.raise_for_status() data = response.json() if 'error' in data: logger.error(f"Deezer API error searching track '{query}': {data['error']}") return None results = data.get('data', []) if results and len(results) > 0: result = results[0] # Cache the track entity try: cache = get_metadata_cache() cache.store_entity('deezer', 'track', str(result.get('id', '')), result) except Exception as e: logger.debug("cache store_entity track search: %s", e) logger.debug(f"Found track for query: {artist_name} - {track_title}") return result logger.debug(f"No track found for query: {artist_name} - {track_title}") return None except Exception as e: logger.error(f"Error searching for track '{artist_name} - {track_title}': {e}") return None @rate_limited def get_album_raw(self, album_id: int) -> Optional[Dict[str, Any]]: """ Get full album details by ID — raw Deezer format (enrichment interface). Used by deezer_worker.py for label/genre/explicit enrichment. Checks metadata cache first to avoid redundant API calls. Args: album_id: Deezer album ID Returns: Full album dict with label, genres, explicit flag or None """ # Check cache first — get_album_raw is called on every enrichment cycle try: cache = get_metadata_cache() cached = cache.get_entity('deezer', 'album', str(album_id)) if cached and cached.get('label'): # Cache hit with full details (has label = was a get_album response, not just search) logger.debug(f"Cache hit for album {album_id}") return cached except Exception as e: logger.debug("cache get_entity album: %s", e) try: response = self.session.get( f"{self.BASE_URL}/album/{album_id}", timeout=10 ) response.raise_for_status() data = response.json() if 'error' in data: logger.error(f"Deezer API error getting album {album_id}: {data['error']}") return None # Cache the full album (includes genres, label, explicit) try: cache = get_metadata_cache() cache.store_entity('deezer', 'album', str(album_id), data) except Exception as e: logger.debug("cache store_entity album full: %s", e) logger.debug(f"Got full album details for ID: {album_id}") return data except Exception as e: logger.error(f"Error getting album {album_id}: {e}") return None @rate_limited def get_track_raw(self, track_id: int) -> Optional[Dict[str, Any]]: """ Get full track details by ID — raw Deezer format (enrichment interface, includes BPM). Used by deezer_worker.py for BPM enrichment. Checks metadata cache first to avoid redundant API calls. Args: track_id: Deezer track ID Returns: Full track dict with BPM or None """ # Check cache first try: cache = get_metadata_cache() cached = cache.get_entity('deezer', 'track', str(track_id)) if cached and cached.get('bpm'): logger.debug(f"Cache hit for track {track_id}") return cached except Exception as e: logger.debug("cache get_entity track: %s", e) try: response = self.session.get( f"{self.BASE_URL}/track/{track_id}", timeout=10 ) response.raise_for_status() data = response.json() if 'error' in data: logger.error(f"Deezer API error getting track {track_id}: {data['error']}") return None # Cache the full track (includes BPM, ISRC, etc.) try: cache = get_metadata_cache() cache.store_entity('deezer', 'track', str(track_id), data) except Exception as e: logger.debug("cache store_entity track full: %s", e) logger.debug(f"Got full track details for ID: {track_id}") return data except Exception as e: logger.error(f"Error getting track {track_id}: {e}") return None @rate_limited def get_playlist(self, playlist_id) -> Optional[Dict[str, Any]]: """ Get a playlist with all its tracks by ID. Fetches playlist metadata and tracks, paginating if the playlist contains more tracks than a single response returns (400 per page). Args: playlist_id: Deezer playlist ID (string or int) Returns: Dict with id, name, description, track_count, image_url, owner, and tracks list, or None on error """ try: playlist_id = str(playlist_id) response = self.session.get( f"{self.BASE_URL}/playlist/{playlist_id}", timeout=15 ) response.raise_for_status() data = response.json() if 'error' in data: logger.error(f"Deezer API error getting playlist {playlist_id}: {data['error']}") return None total_tracks = data.get('nb_tracks', 0) raw_tracks = data.get('tracks', {}).get('data', []) # Paginate if we didn't get all tracks while len(raw_tracks) < total_tracks: index = len(raw_tracks) logger.debug(f"Paginating playlist {playlist_id} tracks at index {index}") page_response = self.session.get( f"{self.BASE_URL}/playlist/{playlist_id}/tracks", params={'index': index, 'limit': 400}, timeout=15 ) page_response.raise_for_status() page_data = page_response.json() if 'error' in page_data: logger.warning(f"Error paginating playlist tracks at index {index}: {page_data['error']}") break page_tracks = page_data.get('data', []) if not page_tracks: break raw_tracks.extend(page_tracks) # Normalize tracks tracks: List[Dict[str, Any]] = [] for i, t in enumerate(raw_tracks, start=1): artist_name = t.get('artist', {}).get('name', 'Unknown Artist') # Some tracks list multiple artists separated by commas or slashes tracks.append({ 'id': str(t.get('id', '')), 'name': t.get('title', ''), 'artists': [artist_name], 'album': t.get('album', {}).get('title', ''), 'duration_ms': t.get('duration', 0) * 1000, 'track_number': i, }) result = { 'id': str(data.get('id', '')), 'name': data.get('title', ''), 'description': data.get('description', ''), 'track_count': total_tracks, 'image_url': data.get('picture_xl') or data.get('picture_big') or data.get('picture_medium', ''), 'owner': data.get('creator', {}).get('name', ''), 'tracks': tracks, } logger.info(f"Fetched playlist '{result['name']}' with {len(tracks)} tracks") return result except Exception as e: logger.error(f"Error getting playlist {playlist_id}: {e}") return None @staticmethod def parse_playlist_url(url: str) -> Optional[str]: """ Extract a Deezer playlist ID from a URL or raw numeric string. Supported formats: https://www.deezer.com/playlist/1234567890 https://www.deezer.com/en/playlist/1234567890 https://deezer.com/playlist/1234567890 1234567890 Args: url: Deezer playlist URL or numeric ID Returns: Playlist ID as a string, or None if the input is invalid """ if not url or not isinstance(url, str): return None url = url.strip() # Raw numeric ID if url.isdigit(): return url # URL pattern: optional www, optional locale segment, /playlist/{id} match = re.match( r'https?://(?:www\.)?deezer\.com/(?:[a-z]{2}/)?playlist/(\d+)', url ) if match: return match.group(1) return None