"""Deezer Download Client — download tracks from Deezer using ARL authentication. Follows the same interface contract as Tidal, Qobuz, YouTube, and HiFi clients. Supports FLAC (HiFi subscription), MP3 320 (Premium), and MP3 128 (Free) with automatic quality fallback. Authentication: User provides an ARL token (browser cookie from deezer.com). """ import asyncio import hashlib import json import os import struct import threading import time import uuid from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import requests from core.download_plugins.types import AlbumResult, DownloadStatus, TrackResult from utils.logging_config import get_logger logger = get_logger("deezer_download") # Deezer API endpoints _GW_API = "https://www.deezer.com/ajax/gw-light.php" _MEDIA_API = "https://media.deezer.com/v1/get_url" # Blowfish decryption secret (public knowledge, used by all Deezer clients) _BF_SECRET = b"g4el58wc0zvf9na1" # Quality format codes for media API _QUALITY_FORMATS = { 'flac': {'cipher': 'BF_CBC_STRIPE', 'format': 'FLAC'}, 'mp3_320': {'cipher': 'BF_CBC_STRIPE', 'format': 'MP3_320'}, 'mp3_128': {'cipher': 'BF_CBC_STRIPE', 'format': 'MP3_128'}, } # Quality preference order (highest first) _QUALITY_ORDER = ['flac', 'mp3_320', 'mp3_128'] # Chunk size for Blowfish decryption (Deezer standard) _CHUNK_SIZE = 2048 # Minimum valid file size (100KB — anything smaller is likely an error) _MIN_FILE_SIZE = 100 * 1024 def _get_blowfish_key(track_id: str) -> bytes: """Derive the Blowfish decryption key for a track.""" md5_hex = hashlib.md5(str(track_id).encode()).hexdigest() return bytes([ ord(md5_hex[i]) ^ ord(md5_hex[i + 16]) ^ _BF_SECRET[i] for i in range(16) ]) def _decrypt_chunk(chunk: bytes, key: bytes) -> bytes: """Decrypt a single chunk using Blowfish CBC with null IV.""" try: from Crypto.Cipher import Blowfish iv = b'\x00\x01\x02\x03\x04\x05\x06\x07' cipher = Blowfish.new(key, Blowfish.MODE_CBC, iv) return cipher.decrypt(chunk) except ImportError: try: from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes iv = b'\x00\x01\x02\x03\x04\x05\x06\x07' cipher = Cipher(algorithms.Blowfish(key), modes.CBC(iv)) decryptor = cipher.decryptor() return decryptor.update(chunk) + decryptor.finalize() except ImportError as exc: raise ImportError( "Deezer downloads require pycryptodome or cryptography package. " "Install with: pip install pycryptodome" ) from exc from core.download_plugins.base import DownloadSourcePlugin class DeezerDownloadClient(DownloadSourcePlugin): """Deezer download client using ARL token authentication.""" def __init__(self, download_path: str = None): from config.settings import config_manager self._config = config_manager if download_path is None: download_path = config_manager.get('soulseek.download_path', './downloads') self.download_path = Path(download_path) self.download_path.mkdir(parents=True, exist_ok=True) # Engine reference is populated by set_engine() at registration # time. None until orchestrator wires the registry. self._engine = None # Shutdown check callback (set by web_server) self.shutdown_check = None # Rate limiting self._last_request = 0 self._min_interval = 0.5 # 500ms between API calls self._api_lock = threading.Lock() # Session state self._session = requests.Session() self._session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ' '(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept-Language': 'en-US,en;q=0.9', }) self._api_token = None self._license_token = None self._user_data = None self._authenticated = False # Quality preference self._quality = config_manager.get('deezer_download.quality', 'flac') # Try to authenticate on init if ARL is configured arl = config_manager.get('deezer_download.arl', '') if arl: self._authenticate(arl) logger.info(f"Deezer download client initialized (download path: {self.download_path})") def set_engine(self, engine): """Engine callback — wires the central thread worker + state store.""" self._engine = engine # ─── Authentication ────────────────────────────────────────── def _authenticate(self, arl: str) -> bool: """Authenticate with Deezer using ARL cookie token.""" try: self._session.cookies.set('arl', arl) # Get user data and API token resp = self._gw_call('deezer.getUserData') if not resp: logger.error("Failed to get user data from Deezer") return False user = resp.get('USER', {}) user_id = user.get('USER_ID', 0) if not user_id or user_id == 0: logger.error("Invalid ARL token — Deezer returned no user") return False self._api_token = resp.get('checkForm', '') self._license_token = user.get('OPTIONS', {}).get('license_token', '') self._user_data = user self._authenticated = True user_name = user.get('BLOG_NAME', 'Unknown') can_stream_lossless = user.get('OPTIONS', {}).get('web_lossless', False) can_stream_hq = user.get('OPTIONS', {}).get('web_hq', False) tier = 'Free' if can_stream_lossless: tier = 'HiFi' elif can_stream_hq: tier = 'Premium' logger.info(f"Deezer authenticated as '{user_name}' (tier: {tier})") return True except Exception as e: logger.error(f"Deezer authentication failed: {e}") self._authenticated = False return False def _gw_call(self, method: str, params: dict = None) -> Optional[dict]: """Call the Deezer gateway API.""" with self._api_lock: elapsed = time.time() - self._last_request if elapsed < self._min_interval: time.sleep(self._min_interval - elapsed) self._last_request = time.time() try: url_params = {'method': method, 'api_version': '1.0'} url_params['api_token'] = self._api_token if self._api_token else 'null' resp = self._session.post( _GW_API, params=url_params, json=params or {}, timeout=15 ) resp.raise_for_status() data = resp.json() if data.get('error'): error = data['error'] if isinstance(error, dict): error_msg = error.get('VALID_TOKEN_REQUIRED') or error.get('GATEWAY_ERROR') or str(error) else: error_msg = str(error) if error_msg: logger.warning(f"Deezer API error ({method}): {error_msg}") return None return data.get('results', {}) except Exception as e: logger.error(f"Deezer API call failed ({method}): {e}") return None # ─── Status & Config ───────────────────────────────────────── def set_shutdown_check(self, check_callable): self.shutdown_check = check_callable def is_configured(self) -> bool: return self._authenticated def is_available(self) -> bool: return self._authenticated def is_authenticated(self) -> bool: return self._authenticated async def check_connection(self) -> bool: loop = asyncio.get_event_loop() return await loop.run_in_executor(None, self.is_available) def reconnect(self, arl: str = None) -> bool: """Re-authenticate with a new or existing ARL.""" if arl is None: arl = self._config.get('deezer_download.arl', '') if not arl: return False self._authenticated = False return self._authenticate(arl) def get_quality_label(self) -> str: """Get human-readable label for current quality setting.""" labels = {'flac': 'FLAC (Lossless)', 'mp3_320': 'MP3 320kbps', 'mp3_128': 'MP3 128kbps'} return labels.get(self._quality, 'MP3 320kbps') # ─── User Playlists (ARL-authenticated) ───────────────────── def get_user_playlists(self) -> list: """Fetch the authenticated user's playlists via Deezer public API with ARL cookies.""" if not self._authenticated or not self._user_data: return [] user_id = self._user_data.get('USER_ID') if not user_id: return [] playlists = [] index = 0 while True: try: resp = self._session.get( f'https://api.deezer.com/user/{user_id}/playlists', params={'index': index, 'limit': 100}, timeout=15 ) resp.raise_for_status() data = resp.json() if 'error' in data: logger.warning(f"Deezer playlists error: {data['error']}") break items = data.get('data', []) if not items: break for p in items: playlists.append({ 'id': str(p.get('id', '')), 'name': p.get('title', ''), 'track_count': p.get('nb_tracks', 0), 'image_url': p.get('picture_medium', ''), 'owner': p.get('creator', {}).get('name', ''), 'description': p.get('description', ''), }) if not data.get('next'): break index += len(items) except Exception as e: logger.error(f"Error fetching user playlists at index {index}: {e}") break logger.info(f"Fetched {len(playlists)} user playlists from Deezer") return playlists def get_user_favorite_artists(self, limit: int = 200) -> list: """Fetch the authenticated user's favorite artists via public API with ARL cookies.""" if not self._authenticated or not self._user_data: return [] user_id = self._user_data.get('USER_ID') if not user_id: return [] artists = [] index = 0 while len(artists) < limit: try: resp = self._session.get( f'https://api.deezer.com/user/{user_id}/artists', params={'index': index, 'limit': min(100, limit - len(artists))}, timeout=15 ) resp.raise_for_status() data = resp.json() if 'error' in data: logger.warning(f"Deezer artists error: {data['error']}") break items = data.get('data', []) if not items: break for a in items: artists.append({ 'deezer_id': str(a.get('id', '')), 'name': a.get('name', ''), 'image_url': a.get('picture_xl') or a.get('picture_big') or a.get('picture_medium', ''), }) if not data.get('next'): break index += len(items) except Exception as e: logger.error(f"Error fetching favorite artists at index {index}: {e}") break logger.info(f"Fetched {len(artists)} favorite artists from Deezer (ARL)") return artists def get_user_favorite_albums(self, limit: int = 200) -> list: """Fetch the authenticated user's favorite albums via public API with ARL cookies.""" if not self._authenticated or not self._user_data: return [] user_id = self._user_data.get('USER_ID') if not user_id: return [] albums = [] index = 0 while len(albums) < limit: try: resp = self._session.get( f'https://api.deezer.com/user/{user_id}/albums', params={'index': index, 'limit': min(100, limit - len(albums))}, timeout=15 ) resp.raise_for_status() data = resp.json() if 'error' in data: logger.warning(f"Deezer albums error: {data['error']}") break items = data.get('data', []) if not items: break for a in items: artist_name = '' if isinstance(a.get('artist'), dict): artist_name = a['artist'].get('name', '') albums.append({ 'deezer_id': str(a.get('id', '')), 'album_name': a.get('title', ''), 'artist_name': artist_name, 'image_url': a.get('cover_xl') or a.get('cover_big') or a.get('cover_medium', ''), 'release_date': a.get('release_date', ''), 'total_tracks': a.get('nb_tracks', 0), }) if not data.get('next'): break index += len(items) except Exception as e: logger.error(f"Error fetching favorite albums at index {index}: {e}") break logger.info(f"Fetched {len(albums)} favorite albums from Deezer (ARL)") return albums def get_playlist_tracks(self, playlist_id: str) -> Optional[dict]: """Fetch full playlist details with tracks via public API (ARL cookies grant private access).""" try: resp = self._session.get( f'https://api.deezer.com/playlist/{playlist_id}', timeout=15 ) resp.raise_for_status() data = resp.json() if 'error' in data: logger.error(f"Deezer playlist error: {data['error']}") return None total_tracks = data.get('nb_tracks', 0) raw_tracks = data.get('tracks', {}).get('data', []) # Paginate if needed while len(raw_tracks) < total_tracks: idx = len(raw_tracks) page_resp = self._session.get( f'https://api.deezer.com/playlist/{playlist_id}/tracks', params={'index': idx, 'limit': 400}, timeout=15 ) page_resp.raise_for_status() page_data = page_resp.json() if 'error' in page_data: break page_tracks = page_data.get('data', []) if not page_tracks: break raw_tracks.extend(page_tracks) # Batch-fetch release dates for unique albums (cache-first) album_ids = set() for t in raw_tracks: aid = t.get('album', {}).get('id') if aid: album_ids.add(str(aid)) album_release_dates = {} try: from core.metadata.cache import get_metadata_cache cache = get_metadata_cache() except Exception: cache = None for aid in album_ids: # Check metadata cache first if cache: try: cached = cache.get_entity('deezer', 'album', aid) if cached and cached.get('release_date'): album_release_dates[aid] = cached['release_date'] continue except Exception as e: logger.debug("cache get_entity album release_date: %s", e) # Cache miss — fetch from API try: time.sleep(0.3) # Respect rate limits a_resp = self._session.get(f'https://api.deezer.com/album/{aid}', timeout=10) if a_resp.ok: a_data = a_resp.json() album_release_dates[aid] = a_data.get('release_date', '') # Store in metadata cache for future use if cache: try: cache.store_entity('deezer', 'album', aid, a_data) except Exception as e: logger.debug("cache store_entity album release_date: %s", e) except Exception as e: logger.debug("fetch deezer album release_date %s: %s", aid, e) tracks = [] for i, t in enumerate(raw_tracks, start=1): artist_name = t.get('artist', {}).get('name', 'Unknown Artist') album_data = t.get('album', {}) album_cover = album_data.get('cover_medium') or album_data.get('cover_small') or '' album_id = str(album_data.get('id', '')) tracks.append({ 'id': str(t.get('id', '')), 'name': t.get('title', ''), 'artists': [{'name': artist_name}], 'album': { 'name': album_data.get('title', ''), 'images': [{'url': album_cover}] if album_cover else [], 'release_date': album_release_dates.get(album_id, ''), 'album_type': 'album', 'total_tracks': total_tracks, 'id': album_id, }, 'duration_ms': t.get('duration', 0) * 1000, 'track_number': i, }) return { 'id': str(data.get('id', '')), 'name': data.get('title', ''), 'description': data.get('description', ''), 'track_count': total_tracks, 'image_url': data.get('picture_medium', ''), 'owner': data.get('creator', {}).get('name', ''), 'tracks': tracks, } except Exception as e: logger.error(f"Error fetching playlist {playlist_id}: {e}") return None # ─── Track Info ────────────────────────────────────────────── def _get_track_data(self, track_id: str) -> Optional[dict]: """Get full track data from Deezer private API.""" return self._gw_call('song.getData', {'sng_id': str(track_id)}) def _get_media_url(self, track_token: str, quality: str) -> Optional[str]: """Get the download URL for a track at the specified quality.""" if not self._license_token: logger.error("No license token — cannot get media URL") return None fmt = _QUALITY_FORMATS.get(quality) if not fmt: logger.error(f"Unknown quality: {quality}") return None try: payload = { 'license_token': self._license_token, 'media': [{ 'type': 'FULL', 'formats': [fmt] }], 'track_tokens': [track_token] } resp = self._session.post(_MEDIA_API, json=payload, timeout=15) resp.raise_for_status() data = resp.json() media_list = data.get('data', []) if not media_list: return None media = media_list[0].get('media', []) if not media: return None sources = media[0].get('sources', []) if not sources: return None # Prefer the first URL return sources[0].get('url') except Exception as e: logger.error(f"Failed to get media URL: {e}") return None # ─── Search ────────────────────────────────────────────────── async def search(self, query: str, timeout: int = None, progress_callback=None) -> Tuple[List[TrackResult], List[AlbumResult]]: """Search Deezer for tracks matching the query.""" loop = asyncio.get_event_loop() return await loop.run_in_executor(None, self._search_sync, query) def _search_sync(self, query: str) -> Tuple[List[TrackResult], List[AlbumResult]]: """Synchronous search implementation.""" if not self._authenticated: logger.warning("Deezer not authenticated — cannot search") return [], [] try: resp = self._session.get( 'https://api.deezer.com/search', params={'q': query, 'limit': 30}, timeout=10 ) resp.raise_for_status() data = resp.json() results = [] for item in data.get('data', []): track_id = str(item.get('id', '')) if not track_id: continue artist = item.get('artist', {}).get('name', 'Unknown') title = item.get('title', 'Unknown') album = item.get('album', {}).get('title', '') duration_ms = (item.get('duration', 0)) * 1000 # Deezer returns seconds # Estimate size based on quality duration_s = item.get('duration', 0) if self._quality == 'flac': est_size = duration_s * 176400 # ~1411kbps bitrate = 1411 quality = 'flac' elif self._quality == 'mp3_320': est_size = duration_s * 40000 # ~320kbps bitrate = 320 quality = 'mp3' else: est_size = duration_s * 16000 # ~128kbps bitrate = 128 quality = 'mp3' results.append(TrackResult( username='deezer_dl', filename=f"{track_id}||{artist} - {title}", size=est_size, bitrate=bitrate, duration=duration_ms, quality=quality, free_upload_slots=999, upload_speed=999999, queue_length=0, artist=artist, title=title, album=album, track_number=item.get('track_position'), )) logger.info(f"Deezer search for '{query}' returned {len(results)} results") return results, [] except Exception as e: logger.error(f"Deezer search failed: {e}") return [], [] # ─── Download ──────────────────────────────────────────────── async def download(self, username: str, filename: str, file_size: int = 0) -> Optional[str]: """Start a download. Returns download_id immediately.""" if not self._authenticated: logger.error("Deezer not authenticated — cannot download") return None if self._engine is None: # Raise rather than return None so the orchestrator's # download_with_fallback surfaces a real warning + tries # the next source. Returning None silently dropped the # download with no user feedback (per JohnBaumb). raise RuntimeError("Deezer client has no engine reference — cannot dispatch download") # Parse filename: "track_id||display_name" parts = filename.split('||', 1) track_id = parts[0] display_name = parts[1] if len(parts) > 1 else f"Track {track_id}" return self._engine.worker.dispatch( source_name='deezer', target_id=track_id, display_name=display_name, original_filename=filename, impl_callable=self._download_sync, extra_record_fields={ 'track_id': track_id, 'display_name': display_name, 'size': file_size, 'error': None, }, # Legacy username slot — frontend status indicators key off # ``deezer_dl``, not the canonical ``deezer``. username_override='deezer_dl', # Diagnostic thread name for multi-thread debugging. thread_name=f'deezer-dl-{track_id}', ) def _set_error(self, download_id: str, message: str) -> None: """Helper: set the engine record's `error` slot. No-op if engine isn't wired or record was already removed.""" if self._engine is None: return self._engine.update_record('deezer', download_id, {'error': message}) def _is_cancelled(self, download_id: str) -> bool: if self._engine is None: return False record = self._engine.get_record('deezer', download_id) return record is not None and record.get('state') == 'Cancelled' def _download_sync(self, download_id: str, track_id: str, display_name: str) -> Optional[str]: """Synchronous download: get URL, download, decrypt, save.""" # Check for shutdown if self.shutdown_check and self.shutdown_check(): if self._engine is not None: self._engine.update_record('deezer', download_id, {'state': 'Aborted'}) return None # Get track data from private API track_data = self._get_track_data(track_id) if not track_data: self._set_error(download_id, 'Failed to get track data') return None track_token = track_data.get('TRACK_TOKEN', '') if not track_token: self._set_error(download_id, 'No track token available') return None # Determine quality and get media URL with fallback media_url = None actual_quality = None allow_fallback = self._config.get('deezer_download.allow_fallback', True) if allow_fallback: quality_order = _QUALITY_ORDER.copy() try: pref_idx = quality_order.index(self._quality) quality_order = quality_order[pref_idx:] + quality_order[:pref_idx] except ValueError: pass else: quality_order = [self._quality] for q in quality_order: url = self._get_media_url(track_token, q) if url: media_url = url actual_quality = q break if not media_url: self._set_error(download_id, 'No media URL available (may require higher subscription tier)') return None if actual_quality != self._quality: logger.info(f"Quality fallback: {self._quality} → {actual_quality} for {display_name}") ext = '.flac' if actual_quality == 'flac' else '.mp3' safe_name = self._sanitize_filename(display_name) out_path = str(self.download_path / f"{safe_name}{ext}") # Download and decrypt try: bf_key = _get_blowfish_key(track_id) resp = self._session.get(media_url, stream=True, timeout=30) resp.raise_for_status() total_size = int(resp.headers.get('content-length', 0)) if self._engine is not None: self._engine.update_record('deezer', download_id, {'size': total_size}) downloaded = 0 chunk_index = 0 start_time = time.time() with open(out_path, 'wb') as f: for raw_chunk in resp.iter_content(chunk_size=_CHUNK_SIZE): if not raw_chunk: continue # Check for cancellation/shutdown if self.shutdown_check and self.shutdown_check(): if self._engine is not None: self._engine.update_record('deezer', download_id, {'state': 'Aborted'}) try: os.remove(out_path) except OSError: pass return None if self._is_cancelled(download_id): try: os.remove(out_path) except OSError: pass return None # Decrypt every 3rd chunk (Deezer's encryption pattern) if chunk_index % 3 == 0 and len(raw_chunk) == _CHUNK_SIZE: chunk_to_write = _decrypt_chunk(raw_chunk, bf_key) else: chunk_to_write = raw_chunk f.write(chunk_to_write) downloaded += len(raw_chunk) chunk_index += 1 # Update progress elapsed = time.time() - start_time speed = int(downloaded / elapsed) if elapsed > 0 else 0 progress = (downloaded / total_size * 100) if total_size > 0 else 0 if self._engine is not None: self._engine.update_record('deezer', download_id, { 'transferred': downloaded, 'progress': min(progress, 99.9), 'speed': speed, }) # Validate file size file_size = os.path.getsize(out_path) if file_size < _MIN_FILE_SIZE: logger.warning(f"Downloaded file too small ({file_size} bytes): {out_path}") try: os.remove(out_path) except OSError: pass self._set_error(download_id, f'File too small ({file_size} bytes)') return None logger.info(f"Deezer download complete: {out_path} ({file_size / 1048576:.1f} MB, {actual_quality})") return out_path except Exception as e: logger.error(f"Download error for {display_name}: {e}") try: os.remove(out_path) except OSError: pass self._set_error(download_id, str(e)) return None # ─── Download Status ───────────────────────────────────────── def _record_to_status(self, record: dict) -> DownloadStatus: return DownloadStatus( id=record['id'], filename=record['filename'], username=record['username'], state=record['state'], progress=record['progress'], size=record.get('size', 0), transferred=record.get('transferred', 0), speed=record.get('speed', 0), file_path=record.get('file_path'), ) async def get_all_downloads(self) -> List[DownloadStatus]: if self._engine is None: return [] return [ self._record_to_status(record) for record in self._engine.iter_records_for_source('deezer') ] async def get_download_status(self, download_id: str) -> Optional[DownloadStatus]: if self._engine is None: return None record = self._engine.get_record('deezer', download_id) return self._record_to_status(record) if record is not None else None async def cancel_download(self, download_id: str, username: str = None, remove: bool = False) -> bool: if self._engine is None: return False if self._engine.get_record('deezer', download_id) is None: return False self._engine.update_record('deezer', download_id, {'state': 'Cancelled'}) if remove: self._engine.remove_record('deezer', download_id) return True async def clear_all_completed_downloads(self) -> bool: if self._engine is None: return True terminal = {'Completed, Succeeded', 'Cancelled', 'Errored', 'Aborted'} for record in list(self._engine.iter_records_for_source('deezer')): if record.get('state') in terminal: self._engine.remove_record('deezer', record['id']) return True # ─── Utilities ─────────────────────────────────────────────── @staticmethod def _sanitize_filename(name: str) -> str: """Sanitize a string for use as a filename.""" import re name = re.sub(r'[<>:"/\\|?*]', '', name) name = name.strip('. ') return name[:200] if name else 'unknown'