import os import requests import time import re import threading from typing import Dict, List, Optional, Any from functools import wraps from dataclasses import dataclass from utils.logging_config import get_logger from config.settings import config_manager import json import base64 import webbrowser import urllib.parse from http.server import HTTPServer, BaseHTTPRequestHandler import socketserver import hashlib import secrets logger = get_logger("tidal_client") # Virtual playlist identity for the user's Favorite Tracks (Tidal's # "My Collection" view). Treated like a normal playlist by every # get_playlist consumer (mirror auto-refresh, discovery, sync UI) — # the client recognizes the ID and dispatches to the dedicated # `userCollectionTracks` endpoint internally. ID intentionally has no # colon: the sync-services.js renderer interpolates IDs into CSS # selectors via template literals (e.g. `#tidal-card-${p.id} .foo`) # and a `:` in the ID would be parsed as a CSS pseudo-class operator. COLLECTION_PLAYLIST_ID = "tidal-favorites" COLLECTION_PLAYLIST_NAME = "Favorite Tracks" COLLECTION_PLAYLIST_DESCRIPTION = "Your favorited tracks on Tidal" # Global rate limiting variables _last_api_call_time = 0 _api_call_lock = threading.Lock() MIN_API_INTERVAL = 0.5 # 500ms between API calls def rate_limited(func): """Decorator to enforce rate limiting on Tidal API calls with retry logic""" @wraps(func) def wrapper(*args, **kwargs): max_retries = 4 last_exception = None for attempt in range(max_retries): global _last_api_call_time with _api_call_lock: current_time = time.time() time_since_last_call = current_time - _last_api_call_time if time_since_last_call < MIN_API_INTERVAL: sleep_time = MIN_API_INTERVAL - time_since_last_call time.sleep(sleep_time) _last_api_call_time = time.time() from core.api_call_tracker import api_call_tracker api_call_tracker.record_call('tidal') try: result = func(*args, **kwargs) return result except Exception as e: last_exception = e error_str = str(e) # Only retry on specific errors if "rate limit" in error_str.lower() or "429" in error_str: backoff = 3.0 * (2 ** attempt) # Exponential: 3s, 6s, 12s, 24s logger.warning(f"Rate limit hit on attempt {attempt + 1}/{max_retries}, backing off {backoff}s: {e}") if attempt < max_retries - 1: time.sleep(backoff) continue elif "503" in error_str or "502" in error_str: logger.warning(f"Tidal service error on attempt {attempt + 1}/{max_retries}, backing off: {e}") if attempt < max_retries - 1: time.sleep(2.0) continue # For other errors, don't retry raise # If we exhausted retries, raise the last exception raise last_exception return wrapper @dataclass class Track: """Tidal track data structure compatible with existing Track objects""" id: str name: str artists: List[str] album: str = "" duration_ms: int = 0 external_urls: Dict[str, str] = None popularity: int = 0 explicit: bool = False def __post_init__(self): if self.external_urls is None: self.external_urls = {} @dataclass class Playlist: """Tidal playlist data structure compatible with existing Playlist objects""" id: str name: str description: str = "" tracks: List[Track] = None external_urls: Dict[str, str] = None owner: Optional[Dict[str, Any]] = None public: bool = True def __post_init__(self): if self.tracks is None: self.tracks = [] if self.external_urls is None: self.external_urls = {} class TidalClient: """Tidal API client for fetching user playlists and track data""" def __init__(self): self.client_id = None self.client_secret = None self.access_token = None self.refresh_token = None self.token_expires_at = 0 self.base_url = "https://openapi.tidal.com/v2" self.alt_base_url = "https://api.tidal.com/v1" # Alternative API base self.auth_url = "https://login.tidal.com/authorize" self.token_url = "https://auth.tidal.com/v1/oauth2/token" _tidal_port = int(os.environ.get('SOULSYNC_TIDAL_CALLBACK_PORT', 8889)) self.redirect_uri = f"http://127.0.0.1:{_tidal_port}/tidal/callback" # Default, will be updated from config self.session = requests.Session() self.auth_server = None self.auth_code = None self.code_verifier = None self.code_challenge = None self._load_config() self._setup_session() # Try to load saved tokens self._load_saved_tokens() def _load_config(self): """Load Tidal configuration from settings""" try: tidal_config = config_manager.get('tidal', {}) self.client_id = tidal_config.get('client_id') self.client_secret = tidal_config.get('client_secret') self.redirect_uri = tidal_config.get('redirect_uri', self.redirect_uri) # Use config or default if not self.client_id or not self.client_secret: logger.warning("Tidal client ID or secret not configured") return False logger.info(f"Loaded Tidal config with client ID: {self.client_id[:8]}...") return True except Exception as e: logger.error(f"Failed to load Tidal configuration: {e}") return False def _setup_session(self): """Setup requests session with headers""" self.session.headers.update({ 'Accept': 'application/vnd.api+json', 'User-Agent': 'SoulSync/1.0' }) def _load_saved_tokens(self): """Load saved tokens from config""" try: tidal_tokens = config_manager.get('tidal_tokens', {}) self.access_token = tidal_tokens.get('access_token') self.refresh_token = tidal_tokens.get('refresh_token') self.token_expires_at = tidal_tokens.get('expires_at', 0) if self.access_token: self.session.headers['Authorization'] = f'Bearer {self.access_token}' logger.info("Loaded saved Tidal tokens") except Exception as e: logger.error(f"Error loading saved Tidal tokens: {e}") def _save_tokens(self): """Save tokens to config""" try: tidal_tokens = { 'access_token': self.access_token, 'refresh_token': self.refresh_token, 'expires_at': self.token_expires_at } config_manager.set('tidal_tokens', tidal_tokens) logger.info("Saved Tidal tokens") except Exception as e: logger.error(f"Error saving Tidal tokens: {e}") def _parse_json_api_track(self, track_data: Dict[str, Any], artist_details_map: Dict[str, Any] = None) -> Optional[Track]: """Parse a track from a JSON:API 'included' object with artist details.""" try: track_id = track_data.get('id') if not track_id: return None attributes = track_data.get('attributes', {}) # Parse artists from relationships and artist details map artists = [] if artist_details_map: relationships = track_data.get('relationships', {}) artist_relationships = relationships.get('artists', {}).get('data', []) for artist_ref in artist_relationships: artist_id = artist_ref.get('id') if artist_id and artist_id in artist_details_map: artist_data = artist_details_map[artist_id] artist_attributes = artist_data.get('attributes', {}) artist_name = artist_attributes.get('name', 'Unknown Artist') artists.append(artist_name) # Fallback if no artists found if not artists: artists = ['Unknown Artist'] # Append version info (e.g. "Bloom remix") to title if present track_title = attributes.get('title', 'Unknown Track') track_version = attributes.get('version') or '' if track_version and track_version.lower() not in track_title.lower(): track_title = f"{track_title} ({track_version})" return Track( id=str(track_id), name=track_title, artists=artists, duration_ms=attributes.get('duration', 0) * 1000 if attributes.get('duration') else 0, # Convert to ms external_urls={'tidal': f"https://tidal.com/browse/track/{track_id}"}, explicit=attributes.get('explicit', False) ) except Exception as e: logger.error(f"Error parsing JSON:API track data: {e}") return None def _generate_pkce_challenge(self): """Generate PKCE code verifier and challenge""" # Generate a random code verifier (43-128 characters) self.code_verifier = base64.urlsafe_b64encode(secrets.token_bytes(32)).decode('utf-8').rstrip('=') # Create code challenge (SHA256 hash of verifier, base64 URL-encoded) challenge_bytes = hashlib.sha256(self.code_verifier.encode('utf-8')).digest() self.code_challenge = base64.urlsafe_b64encode(challenge_bytes).decode('utf-8').rstrip('=') logger.info(f"Generated PKCE verifier: {self.code_verifier[:10]}...") logger.info(f"Generated PKCE challenge: {self.code_challenge[:10]}...") def authenticate(self): """Start OAuth authentication flow""" try: if not self.client_id: logger.error("Tidal client ID not configured") return False # Generate PKCE challenge self._generate_pkce_challenge() # Create OAuth URL with PKCE. # `prompt=consent` forces Tidal to display the consent # screen even when the app is already authorized — without # it, re-authenticating with newly-added scopes (e.g. # `collection.read` added in v2.5.0) can silently return a # token carrying only the ORIGINAL scope set because Tidal # treats the existing authorization as still valid. params = { 'response_type': 'code', 'client_id': self.client_id, 'redirect_uri': self.redirect_uri, 'scope': 'user.read playlists.read collection.read', # collection.read needed for userCollectionTracks endpoint 'code_challenge': self.code_challenge, 'code_challenge_method': 'S256', 'prompt': 'consent', } auth_url = f"{self.auth_url}?" + urllib.parse.urlencode(params) logger.info("Starting Tidal OAuth flow...") logger.info(f"OAuth URL: {auth_url}") logger.info(f"Redirect URI: {self.redirect_uri}") # Start callback server self._start_callback_server() # Open browser webbrowser.open(auth_url) # Wait for callback (with timeout) timeout = 120 # 2 minutes start_time = time.time() while not self.auth_code and time.time() - start_time < timeout: time.sleep(0.1) # Stop server if self.auth_server: self.auth_server.shutdown() self.auth_server = None if not self.auth_code: logger.error("Tidal OAuth timeout - no authorization code received") return False # Exchange code for tokens return self._exchange_code_for_tokens() except Exception as e: logger.error(f"Error in Tidal OAuth flow: {e}") return False def _start_callback_server(self): """Start HTTP server to receive OAuth callback""" # Skip starting server in Docker/production mode - web server handles callbacks import os if os.getenv('FLASK_ENV') == 'production' or os.path.exists('/.dockerenv'): logger.info("Docker/WebUI mode detected - skipping TidalClient callback server (web server handles callbacks)") return # Store reference to self for the callback handler tidal_client_ref = self class CallbackHandler(BaseHTTPRequestHandler): def do_GET(handler_self): parsed_url = urllib.parse.urlparse(handler_self.path) query_params = urllib.parse.parse_qs(parsed_url.query) # Debug: Log the full callback URL and parameters logger.info(f"Tidal callback received: {handler_self.path}") logger.info(f"Query parameters: {query_params}") if 'code' in query_params: tidal_client_ref.auth_code = query_params['code'][0] logger.info(f"Received Tidal authorization code: {tidal_client_ref.auth_code[:10]}...") # Send success response handler_self.send_response(200) handler_self.send_header('Content-type', 'text/html') handler_self.end_headers() handler_self.wfile.write(b'

Success!

You can close this window and return to SoulSync.

') elif 'error' in query_params: # Handle OAuth errors error = query_params.get('error', ['unknown'])[0] error_description = query_params.get('error_description', ['No description'])[0] logger.error(f"Tidal OAuth error: {error} - {error_description}") handler_self.send_response(400) handler_self.send_header('Content-type', 'text/html') handler_self.end_headers() handler_self.wfile.write(f'

OAuth Error

Error: {error}

Description: {error_description}

'.encode()) else: logger.error("No authorization code or error in Tidal callback") handler_self.send_response(400) handler_self.send_header('Content-type', 'text/html') handler_self.end_headers() handler_self.wfile.write(b'

Error

Authorization failed - no code received.

') def log_message(handler_self, format, *args): pass # Suppress server logs try: port = int(os.environ.get('SOULSYNC_TIDAL_CALLBACK_PORT', 8889)) self.auth_server = HTTPServer(('localhost', port), CallbackHandler) server_thread = threading.Thread(target=self.auth_server.serve_forever) server_thread.daemon = True server_thread.start() logger.info(f"Started Tidal callback server on port {port}") except Exception as e: logger.error(f"Failed to start Tidal callback server: {e}") @rate_limited def _exchange_code_for_tokens(self): """Exchange authorization code for access tokens""" try: data = { 'grant_type': 'authorization_code', 'code': self.auth_code, 'redirect_uri': self.redirect_uri, 'client_id': self.client_id, 'client_secret': self.client_secret, 'code_verifier': self.code_verifier } response = self.session.post( self.token_url, data=data, headers={ 'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'application/json', }, timeout=10 ) if response.status_code == 200: token_data = response.json() self.access_token = token_data.get('access_token') self.refresh_token = token_data.get('refresh_token') expires_in = token_data.get('expires_in', 3600) self.token_expires_at = time.time() + expires_in - 60 # Update session headers self.session.headers['Authorization'] = f'Bearer {self.access_token}' # Save tokens self._save_tokens() logger.info("Successfully exchanged Tidal code for tokens") return True else: logger.error(f"Failed to exchange Tidal code: {response.status_code} - {response.text}") return False except Exception as e: logger.error(f"Error exchanging Tidal code for tokens: {e}") return False @rate_limited def _refresh_access_token(self): """Refresh the access token using refresh token""" try: if not self.refresh_token: logger.error("No Tidal refresh token available") return False if not self.client_id or not self.client_secret: logger.debug("Tidal client_id/secret not configured — skipping token refresh") # Clear stale tokens so we stop retrying self.access_token = None self.refresh_token = None self.token_expires_at = 0 return False data = { 'grant_type': 'refresh_token', 'refresh_token': self.refresh_token, 'client_id': self.client_id, 'client_secret': self.client_secret } response = self.session.post( self.token_url, data=data, headers={ 'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'application/json', }, timeout=10 ) if response.status_code == 200: token_data = response.json() self.access_token = token_data.get('access_token') expires_in = token_data.get('expires_in', 3600) self.token_expires_at = time.time() + expires_in - 60 # Update refresh token if provided if 'refresh_token' in token_data: self.refresh_token = token_data['refresh_token'] # Update session headers self.session.headers['Authorization'] = f'Bearer {self.access_token}' # Save tokens self._save_tokens() logger.info("Successfully refreshed Tidal access token") return True else: logger.error(f"Failed to refresh Tidal token: {response.status_code} - {response.text}") return False except Exception as e: logger.error(f"Error refreshing Tidal token: {e}") return False def fetch_token_from_code(self, auth_code: str) -> bool: """Exchange authorization code for access tokens (for web server callback)""" try: logger.info(f"Starting token exchange with code: {auth_code[:20]}...") logger.info(f"Using code_verifier: {self.code_verifier[:20] if self.code_verifier else 'None'}...") logger.info(f"Using redirect_uri: {self.redirect_uri}") self.auth_code = auth_code result = self._exchange_code_for_tokens() if result: logger.info("Token exchange successful") else: logger.error("Token exchange failed") return result except Exception as e: logger.error(f"Error in fetch_token_from_code: {e}") import traceback logger.error(f"Full traceback: {traceback.format_exc()}") return False def _ensure_valid_token(self): """Ensure we have a valid access token""" if not self.access_token: logger.info("No Tidal access token - need to authenticate") return self.authenticate() if time.time() >= self.token_expires_at: logger.info("Tidal access token expired - refreshing...") if self.refresh_token: return self._refresh_access_token() else: logger.info("No refresh token - need to re-authenticate") return self.authenticate() return True def is_authenticated(self): """Check if client is authenticated, refreshing expired tokens if possible""" if self.access_token and time.time() < self.token_expires_at: return True # Backoff: if refresh recently failed, don't retry for 5 minutes if hasattr(self, '_refresh_failed_at') and self._refresh_failed_at: if time.time() - self._refresh_failed_at < 300: return False # Token expired but refresh token available — try silent refresh if self.access_token and self.refresh_token: logger.info("Tidal access token expired — attempting silent refresh...") result = self._refresh_access_token() if not result: self._refresh_failed_at = time.time() return result return False def disconnect(self): """Clear all saved Tidal auth state so the next OAuth flow starts fresh with the current scope set. Used when a previously-authorized token doesn't carry a newly- added scope (e.g. `collection.read`): even with `prompt=consent` on the auth URL, some users hit a Tidal flow that rebinds the existing grant. Disconnect first → re-authenticate forces a clean slate.""" self.access_token = None self.refresh_token = None self.token_expires_at = 0 self._collection_needs_reconnect = False self.session.headers.pop('Authorization', None) try: config_manager.set('tidal_tokens', {}) except Exception as e: logger.warning(f"Failed to clear tidal_tokens config: {e}") logger.info("Tidal client disconnected — saved tokens cleared") def _get_user_id(self): """Get current user's ID from /users/me endpoint""" try: endpoints_to_try = [ # V2 API (Prioritize this as it matches your documentation) (f"{self.base_url}/users/me", "v2"), (f"{self.base_url}/me", "v2 alt"), # V1 API (f"{self.alt_base_url}/users/me", "v1") ] for endpoint, version in endpoints_to_try: try: logger.info(f"Trying to get user ID from {version}: {endpoint}") if version == "v1": headers = { 'Accept': 'application/json', 'Authorization': f'Bearer {self.access_token}', 'User-Agent': 'TIDAL_ANDROID/2.47.1 okhttp/4.9.0' } params = {'countryCode': 'US'} else: # For v2, use the standard session headers headers = self.session.headers.copy() # The v2 endpoint also requires the correct 'accept' header headers['accept'] = 'application/vnd.api+json' params = {} response = requests.get(endpoint, headers=headers, params=params, timeout=10) logger.info(f"User ID response: {response.status_code}") if response.status_code == 200: data = response.json() logger.info(f"User data keys: {list(data.keys()) if isinstance(data, dict) else 'Not a dict'}") # --- START OF CORRECTION --- # Correctly parse the nested JSON structure from your example user_id = None if 'data' in data and isinstance(data['data'], dict): user_id = data['data'].get('id') # Fallback to the original checks for other API responses if not user_id: user_id = data.get('id') or data.get('userId') or data.get('uid') or data.get('user_id') # --- END OF CORRECTION --- if user_id: logger.info(f"Found user ID: {user_id}") return str(user_id), version else: logger.warning(f"No user ID found in response: {data}") else: logger.warning(f"Failed to get user ID: {response.status_code} - {response.text[:200]}") except Exception as e: logger.warning(f"Error getting user ID from {version}: {e}") continue return None, None except Exception as e: logger.error(f"Error in _get_user_id: {e}") return None, None @rate_limited def get_user_playlists_metadata_only(self): """Get user's playlists using the V2 filtered endpoint.""" try: if not self._ensure_valid_token(): logger.error("Not authenticated with Tidal") return [] # Step 1: Get the user ID, which is needed for the filter. user_id, _ = self._get_user_id() if not user_id: logger.error("Could not retrieve Tidal User ID to fetch playlists.") return [] logger.info(f"Using V2 endpoint to fetch playlists for user ID: {user_id}") # Step 2: Construct the correct V2 endpoint and parameters. # NOTE: We don't include 'items' here because the V2 API only includes ~20 tracks # We'll fetch full track lists separately for each playlist endpoint = f"{self.base_url}/playlists" params = { 'countryCode': 'US', 'filter[owners.id]': user_id } headers = self.session.headers.copy() headers['accept'] = 'application/vnd.api+json' response = requests.get(endpoint, params=params, headers=headers, timeout=15) if response.status_code != 200: logger.error(f"Failed to fetch V2 playlists: {response.status_code} - {response.text}") return [] data = response.json() playlists = [] # Step 3: Process the playlists from the main 'data' array. # Only extract metadata — tracks are fetched on-demand when the user # selects a playlist to sync/mirror, not during the listing step. for playlist_data in data.get('data', []): attributes = playlist_data.get('attributes', {}) playlist_id = playlist_data.get('id') # Extract image URL from relationships if available image_url = None try: relationships = playlist_data.get('relationships', {}) image_rel = relationships.get('image', {}).get('data', {}) if image_rel: # Image URL may be in included resources or constructed from ID image_id = image_rel.get('id', '') if image_id: image_url = f"https://resources.tidal.com/images/{image_id.replace('-', '/')}/640x640.jpg" except Exception as _e: logger.debug("tidal v2 playlists image_url extract: %s", _e) new_playlist = Playlist( id=str(playlist_id), name=attributes.get('name', 'Unknown Playlist'), description=attributes.get('description', ''), external_urls={'tidal': f"https://listen.tidal.com/playlist/{playlist_id}"}, public=attributes.get('accessType') == 'PUBLIC', tracks=[], # Empty — fetched on-demand via get_playlist() ) # Store track count from metadata (no API call needed) # V2 API may use different field names depending on version new_playlist.track_count = ( attributes.get('numberOfTracks') or attributes.get('numberOfItems') or attributes.get('totalNumberOfItems') or attributes.get('nrOfTracks') or 0 ) if image_url: new_playlist.image_url = image_url playlists.append(new_playlist) logger.info(f"Successfully retrieved {len(playlists)} playlists (metadata only) with the V2 filter method.") return playlists except Exception as e: logger.error(f"A critical error occurred while fetching Tidal V2 playlists: {e}") return [] def _try_direct_playlist_endpoints(self): """Fallback method to try direct playlist endpoints without user ID""" playlists = [] fallback_endpoints = [ (f"{self.alt_base_url}/my/playlists", "v1 fallback my playlists"), (f"{self.base_url}/me/playlists", "v2 fallback me playlists"), ] for endpoint, description in fallback_endpoints: try: logger.info(f"Fallback: trying {description}") headers = { 'Accept': 'application/json', 'Authorization': f'Bearer {self.access_token}', 'User-Agent': 'TIDAL_ANDROID/2.47.1 okhttp/4.9.0' } if "v1" in description else self.session.headers.copy() response = requests.get(endpoint, headers=headers, params={'limit': 50}, timeout=10) logger.info(f"Fallback response: {response.status_code}") if response.status_code == 200: data = response.json() # Process response same as above items = data.get('items', data.get('data', data if isinstance(data, list) else [])) if items: for item in items: playlist = Playlist( id=item.get('id', item.get('uuid', 'unknown')), name=item.get('title', item.get('name', 'Unknown Playlist')), description=item.get('description', ''), external_urls={'tidal': f"https://tidal.com/browse/playlist/{item.get('uuid', item.get('id'))}"}, public=not item.get('publicPlaylist', True) ) playlists.append(playlist) logger.info(f"Fallback retrieved {len(playlists)} playlists") return playlists except Exception as e: logger.warning(f"Fallback error: {e}") continue logger.error("All Tidal playlist endpoints failed") return playlists @rate_limited def search_tracks(self, query: str, limit: int = 10) -> List[Track]: """Search for tracks using Tidal's search API""" try: if not self._ensure_valid_token(): logger.error("Not authenticated with Tidal") return [] from urllib.parse import quote encoded_query = quote(query, safe='') params = { 'countryCode': 'US', 'include': 'tracks', 'limit': limit } response = self.session.get( f"{self.base_url}/searchResults/{encoded_query}", params=params, timeout=10 ) if response.status_code == 429: raise Exception("Rate limited (429) on search_tracks") if response.status_code == 200: data = response.json() tracks = [] # Handle V2 JSON:API response formats items = [] if 'tracks' in data and isinstance(data['tracks'], list): items = data['tracks'] elif 'tracks' in data and 'items' in data['tracks']: items = data['tracks']['items'] elif 'included' in data: items = [r for r in data['included'] if r.get('type') == 'tracks'] for item in items: # Flatten JSON:API resource if needed if 'attributes' in item and 'id' in item: flat = dict(item['attributes']) flat['id'] = item['id'] item = flat track = self._parse_track_data(item) if track: tracks.append(track) logger.info(f"Found {len(tracks)} Tidal tracks for query: '{query}'") return tracks else: logger.error(f"Tidal search failed: {response.status_code} - {response.text}") return [] except Exception as e: if "429" in str(e): raise # Let rate_limited decorator handle retry logger.error(f"Error searching Tidal tracks: {e}") return [] # ── Enrichment API Methods ── @rate_limited def search_artist(self, name: str) -> Optional[Dict]: """Search for an artist by name. Returns best matching result as raw dict or None.""" try: if not self._ensure_valid_token(): return None from urllib.parse import quote from difflib import SequenceMatcher encoded_query = quote(name, safe='') params = { 'countryCode': 'US', 'include': 'artists', } response = self.session.get( f"{self.base_url}/searchResults/{encoded_query}", params=params, timeout=10 ) if response.status_code == 429: raise Exception("Rate limited (429) on search_artist") if response.status_code == 200: data = response.json() # JSON:API format: included artists in 'artists' or nested in relationships items = [] if 'artists' in data and isinstance(data['artists'], list): items = data['artists'] elif 'artists' in data and 'items' in data['artists']: items = data['artists']['items'] elif 'included' in data: items = [r for r in data['included'] if r.get('type') == 'artists'] if items: # Flatten all items and pick best name match best_item = None best_score = 0.0 for item in items: if 'attributes' in item and 'id' in item: flat = dict(item['attributes']) flat['id'] = item['id'] else: flat = item item_name = flat.get('name', '') score = SequenceMatcher(None, name.lower(), item_name.lower()).ratio() if score > best_score: best_score = score best_item = flat return best_item else: logger.debug(f"Tidal artist search failed: {response.status_code}") return None except Exception as e: if "429" in str(e): raise # Let rate_limited decorator handle retry logger.error(f"Error searching Tidal artist: {e}") return None @rate_limited def search_album(self, artist: str, title: str) -> Optional[Dict]: """Search for an album by artist + title. Returns first result as raw dict or None.""" try: if not self._ensure_valid_token(): return None from urllib.parse import quote query = f"{artist} {title}" if artist else title encoded_query = quote(query, safe='') params = { 'countryCode': 'US', 'include': 'albums', } response = self.session.get( f"{self.base_url}/searchResults/{encoded_query}", params=params, timeout=10 ) if response.status_code == 429: raise Exception("Rate limited (429) on search_album") if response.status_code == 200: data = response.json() items = [] if 'albums' in data and isinstance(data['albums'], list): items = data['albums'] elif 'albums' in data and 'items' in data['albums']: items = data['albums']['items'] elif 'included' in data: items = [r for r in data['included'] if r.get('type') == 'albums'] if items: # Flatten all items and pick best title match from difflib import SequenceMatcher best_item = None best_score = 0.0 for item in items: if 'attributes' in item and 'id' in item: flat = dict(item['attributes']) flat['id'] = item['id'] # Preserve artist relationship for cross-verification try: rel_artists = item.get('relationships', {}).get('artists', {}).get('data', []) if rel_artists: flat['artist'] = {'id': rel_artists[0].get('id')} except (AttributeError, IndexError, TypeError): pass else: flat = item item_title = flat.get('title', '') score = SequenceMatcher(None, title.lower(), item_title.lower()).ratio() if score > best_score: best_score = score best_item = flat return best_item else: logger.debug(f"Tidal album search failed: {response.status_code}") return None except Exception as e: if "429" in str(e): raise # Let rate_limited decorator handle retry logger.error(f"Error searching Tidal album: {e}") return None @rate_limited def search_track(self, artist: str, title: str) -> Optional[Dict]: """Search for a track by artist + title. Returns first result as raw dict or None.""" try: if not self._ensure_valid_token(): return None from urllib.parse import quote query = f"{artist} {title}" if artist else title encoded_query = quote(query, safe='') params = { 'countryCode': 'US', 'include': 'tracks', } response = self.session.get( f"{self.base_url}/searchResults/{encoded_query}", params=params, timeout=10 ) if response.status_code == 429: raise Exception("Rate limited (429) on search_track") if response.status_code == 200: data = response.json() items = [] if 'tracks' in data and isinstance(data['tracks'], list): items = data['tracks'] elif 'tracks' in data and 'items' in data['tracks']: items = data['tracks']['items'] elif 'included' in data: items = [r for r in data['included'] if r.get('type') == 'tracks'] if items: # Flatten all items and pick best title match from difflib import SequenceMatcher best_item = None best_score = 0.0 for item in items: if 'attributes' in item and 'id' in item: flat = dict(item['attributes']) flat['id'] = item['id'] # Preserve artist relationship for cross-verification try: rel_artists = item.get('relationships', {}).get('artists', {}).get('data', []) if rel_artists: flat['artist'] = {'id': rel_artists[0].get('id')} except (AttributeError, IndexError, TypeError): pass else: flat = item item_title = flat.get('title', '') score = SequenceMatcher(None, title.lower(), item_title.lower()).ratio() if score > best_score: best_score = score best_item = flat return best_item else: logger.debug(f"Tidal track search failed: {response.status_code}") return None except Exception as e: if "429" in str(e): raise # Let rate_limited decorator handle retry logger.error(f"Error searching Tidal track: {e}") return None @rate_limited def get_artist(self, artist_id: str) -> Optional[Dict]: """Get full artist details by Tidal ID.""" try: if not self._ensure_valid_token(): return None response = self.session.get( f"{self.base_url}/artists/{artist_id}", params={'countryCode': 'US'}, headers={'accept': 'application/vnd.api+json'}, timeout=10 ) if response.status_code == 429: raise Exception("Rate limited (429) on get_artist") if response.status_code == 200: data = response.json() # Handle JSON:API format if 'data' in data and 'attributes' in data.get('data', {}): result = dict(data['data'].get('attributes', {})) result['id'] = data['data'].get('id', artist_id) return result return data else: logger.debug(f"Tidal get_artist failed: {response.status_code}") return None except Exception as e: if "429" in str(e): raise # Let rate_limited decorator handle retry logger.error(f"Error getting Tidal artist {artist_id}: {e}") return None @rate_limited def get_album(self, album_id: str) -> Optional[Dict]: """Get full album details by Tidal ID.""" try: if not self._ensure_valid_token(): return None response = self.session.get( f"{self.base_url}/albums/{album_id}", params={'countryCode': 'US'}, headers={'accept': 'application/vnd.api+json'}, timeout=10 ) if response.status_code == 429: raise Exception("Rate limited (429) on get_album") if response.status_code == 200: data = response.json() if 'data' in data and 'attributes' in data.get('data', {}): result = dict(data['data'].get('attributes', {})) result['id'] = data['data'].get('id', album_id) return result return data else: logger.debug(f"Tidal get_album failed: {response.status_code}") return None except Exception as e: if "429" in str(e): raise # Let rate_limited decorator handle retry logger.error(f"Error getting Tidal album {album_id}: {e}") return None @rate_limited def get_album_tracks(self, album_id: str, limit: Optional[int] = None) -> List[Track]: """Fetch every track on an album with full artist + name + duration metadata hydrated. Two-phase: walk `/v2/albums/{id}/relationships/items?include=items` cursor chain to enumerate track IDs (with their position metadata — `meta.trackNumber` + `meta.volumeNumber` for multi-disc), then feed the IDs through the existing `_get_tracks_batch` helper for artist + album-name resolution. Returns a list of `Track` dataclasses with `track_number` and `disc_number` attached as ad-hoc attributes so callers that need per-position info (download modal, virtual playlist build) can read them. Backend `/api/discover/album//` serializes these to the same shape Spotify/Deezer return.""" if not self._ensure_valid_token(): return [] # Phase 1: enumerate track IDs + position metadata via cursor pagination. # The relationship endpoint pages at 20 items by default. The `meta` # dict on each ref carries `trackNumber` + `volumeNumber` (multi-disc). track_meta_by_id: Dict[str, Dict[str, int]] = {} track_ids: List[str] = [] next_path: Optional[str] = None while True: if next_path: url = (next_path if next_path.startswith('http') else f"https://openapi.tidal.com/v2{next_path}") params = None else: url = f"{self.base_url}/albums/{album_id}/relationships/items" params = {'countryCode': 'US', 'include': 'items'} try: resp = self.session.get( url, params=params, headers={'accept': 'application/vnd.api+json'}, timeout=15, ) except Exception as e: logger.debug(f"Tidal album-tracks page request failed: {e}") break if resp.status_code != 200: if resp.status_code == 429: raise Exception("Rate limited (429) on get_album_tracks") logger.debug( f"Tidal album-tracks page returned {resp.status_code}: {resp.text[:200]}" ) break try: data = resp.json() except ValueError: break for item in data.get('data', []): if item.get('type') != 'tracks': continue tid = item.get('id') if not tid: continue tid = str(tid) meta = item.get('meta', {}) or {} track_meta_by_id[tid] = { 'track_number': int(meta.get('trackNumber') or 0), 'disc_number': int(meta.get('volumeNumber') or 1), } track_ids.append(tid) if limit is not None and len(track_ids) >= limit: break if limit is not None and len(track_ids) >= limit: break next_path = data.get('links', {}).get('next') if not next_path: break time.sleep(0.3) if not track_ids: return [] # Phase 2: batch hydrate via existing helper (artists + album names). # Annotate each Track with position metadata so callers can build the # per-track-number payload the download pipeline expects. hydrated: List[Track] = [] for i in range(0, len(track_ids), self._COLLECTION_BATCH_SIZE): batch_ids = track_ids[i:i + self._COLLECTION_BATCH_SIZE] try: batch_tracks = self._get_tracks_batch(batch_ids) except Exception as e: logger.debug(f"Tidal album-tracks batch hydration failed: {e}") continue for t in batch_tracks: meta = track_meta_by_id.get(str(t.id), {}) t.track_number = meta.get('track_number', 0) t.disc_number = meta.get('disc_number', 1) hydrated.append(t) # Tidal's relationship walk returns tracks in album order; the # batch endpoint may not preserve order. Sort by (disc, track) # so the modal renders the album top-down. hydrated.sort(key=lambda t: ( getattr(t, 'disc_number', 1), getattr(t, 'track_number', 0), )) logger.info( f"Retrieved {len(hydrated)}/{len(track_ids)} tracks for Tidal album {album_id}" ) return hydrated @rate_limited def get_track(self, track_id: str) -> Optional[Dict]: """Get full track details by Tidal ID.""" try: if not self._ensure_valid_token(): return None response = self.session.get( f"{self.base_url}/tracks/{track_id}", params={'countryCode': 'US'}, headers={'accept': 'application/vnd.api+json'}, timeout=10 ) if response.status_code == 429: raise Exception("Rate limited (429) on get_track") if response.status_code == 200: data = response.json() if 'data' in data and 'attributes' in data.get('data', {}): result = dict(data['data'].get('attributes', {})) result['id'] = data['data'].get('id', track_id) return result return data else: logger.debug(f"Tidal get_track failed: {response.status_code}") return None except Exception as e: if "429" in str(e): raise # Let rate_limited decorator handle retry logger.error(f"Error getting Tidal track {track_id}: {e}") return None @rate_limited def get_playlist(self, playlist_id: str) -> Optional[Playlist]: """Get playlist details including tracks using JSON:API format. Recognizes the virtual ``tidal-favorites`` ID and dispatches to ``get_collection_tracks`` so every caller that already accepts a playlist ID (mirror auto-refresh, discovery start, per-playlist detail endpoint) gets Favorite Tracks support for free without per-site special-casing. ID intentionally has no colon — the sync-services.js renderer builds CSS selectors via template literal interpolation (``#tidal-card-${p.id} .playlist-card-track-count``) and a ``:`` in the ID would be parsed as a CSS pseudo-class operator. """ try: if playlist_id == COLLECTION_PLAYLIST_ID: collection_tracks = self.get_collection_tracks() return Playlist( id=COLLECTION_PLAYLIST_ID, name=COLLECTION_PLAYLIST_NAME, description=COLLECTION_PLAYLIST_DESCRIPTION, tracks=collection_tracks, owner={'name': 'You'}, public=False, ) if not self._ensure_valid_token(): logger.error("Not authenticated with Tidal") return None # Get playlist metadata with JSON:API format headers = {'accept': 'application/vnd.api+json'} response = self.session.get( f"{self.base_url}/playlists/{playlist_id}", params={'countryCode': 'US'}, headers=headers, timeout=10 ) response.raise_for_status() if response.status_code != 200: logger.error(f"Failed to get Tidal playlist {playlist_id}: {response.status_code} - {response.text}") return None # Parse JSON:API response structure playlist_data = response.json().get("data", {}) playlist_attrs = playlist_data.get("attributes", {}) # Get playlist tracks with cursor-based pagination tracks = [] cursor = None total_fetched = 0 page_num = 0 consecutive_failures = 0 MAX_PAGE_RETRIES = 3 while True: page_num += 1 # Rate limit between pagination pages (skip first page) if page_num > 1: time.sleep(1.0) # Fetch a page of track IDs try: tracks_page = self._get_playlist_tracks_page(playlist_id, cursor) except Exception as e: error_str = str(e) if "429" in error_str or "rate limit" in error_str.lower(): consecutive_failures += 1 if consecutive_failures <= MAX_PAGE_RETRIES: backoff = 10.0 * consecutive_failures # 10s, 20s, 30s logger.warning(f"Playlist pagination rate limited on page {page_num}, waiting {backoff}s (attempt {consecutive_failures}/{MAX_PAGE_RETRIES})") time.sleep(backoff) page_num -= 1 # Retry same page continue else: logger.error(f"Playlist pagination failed after {MAX_PAGE_RETRIES} retries, returning {total_fetched} tracks fetched so far") break else: logger.error(f"Error fetching playlist page {page_num}: {e}") break if not tracks_page or not tracks_page.get("data"): logger.info("No more tracks found, stopping pagination") break # Reset failure counter on success consecutive_failures = 0 # Extract track IDs from this page track_ids = [] for item in tracks_page.get("data", []): # In JSON:API, relationship items have both 'type' and 'id' # The type should be 'tracks' but we'll be defensive if item.get("type") and item.get("id"): track_ids.append(item.get("id")) if track_ids: # Batch fetch full track details with artists and albums try: batch_tracks = self._get_tracks_batch(track_ids) except Exception as e: logger.error(f"Error fetching track details for page {page_num}: {e}") # Continue pagination — we lose this batch but can still get remaining batch_tracks = [] if len(batch_tracks) < len(track_ids): logger.warning(f"Page {page_num}: requested {len(track_ids)} tracks but only {len(batch_tracks)} returned (some may be unavailable in your region)") tracks.extend(batch_tracks) total_fetched += len(batch_tracks) logger.info(f"Fetched {len(batch_tracks)} tracks in this batch, {total_fetched} total so far") # Get next cursor from Tidal's response # Tidal uses: links.meta.nextCursor (confirmed by PR #113) cursor = tracks_page.get("links", {}).get("meta", {}).get("nextCursor") # If no cursor found, pagination is complete if not cursor: logger.info("No next cursor found, pagination complete") break playlist = Playlist( id=playlist_data.get('id', playlist_id), name=playlist_attrs.get('name', 'Unknown Playlist'), description=playlist_attrs.get('description', ''), tracks=tracks, external_urls={'tidal': f"https://listen.tidal.com/playlist/{playlist_id}"}, public=playlist_attrs.get('accessType', '') == "PUBLIC" ) # Extract cover image URL from relationships (same logic as get_user_playlists_metadata_only) try: relationships = playlist_data.get('relationships', {}) image_rel = relationships.get('image', {}).get('data', {}) if image_rel: image_id = image_rel.get('id', '') if image_id: playlist.image_url = f"https://resources.tidal.com/images/{image_id.replace('-', '/')}/640x640.jpg" except Exception as _e: logger.debug("tidal playlist image_url extract: %s", _e) logger.info(f"Retrieved Tidal playlist '{playlist.name}' with {len(tracks)} tracks") return playlist except Exception as e: logger.error(f"Error getting Tidal playlist {playlist_id}: {e}") return None @rate_limited def _get_playlist_tracks_page(self, playlist_id: str, cursor: Optional[str] = None) -> Optional[Dict[str, Any]]: """Fetch a page of track IDs from a playlist using cursor-based pagination""" try: params = {"countryCode": "US"} if cursor: params["page[cursor]"] = cursor headers = {'accept': 'application/vnd.api+json'} response = self.session.get( f"{self.base_url}/playlists/{playlist_id}/relationships/items", params=params, headers=headers, timeout=10 ) response.raise_for_status() if response.status_code != 200: logger.error(f"Failed to get playlist tracks page: {response.status_code} - {response.text}") return None return response.json() except requests.exceptions.HTTPError: raise # Let HTTP errors (429, 503, etc.) propagate to rate_limited decorator for retry except Exception as e: logger.error(f"Error fetching playlist tracks page: {e}") return None @rate_limited def _get_tracks_batch(self, track_ids: List[str]) -> List[Track]: """Batch fetch track details with artists and albums included""" try: if not track_ids: return [] params = { "countryCode": "US", "include": "artists,albums", "filter[id]": ",".join(track_ids) } headers = {'accept': 'application/vnd.api+json'} response = self.session.get( f"{self.base_url}/tracks", params=params, headers=headers, timeout=10 ) response.raise_for_status() if response.status_code != 200: logger.error(f"Failed to get tracks batch: {response.status_code} - {response.text}") return [] tracks_data = response.json() # Build lookup caches for albums and artists from included data album_cache: Dict[str, str] = {} artist_cache: Dict[str, str] = {} for item in tracks_data.get("included", []): item_id = item.get("id") item_type = item.get("type") if item_type == "albums": album_cache[item_id] = item.get("attributes", {}).get("title", "Unknown Album") elif item_type == "artists": artist_cache[item_id] = item.get("attributes", {}).get("name", "Unknown Artist") # Parse tracks and hydrate with artist/album data hydrated_tracks: List[Track] = [] for track_data in tracks_data.get("data", []): attrs = track_data.get("attributes", {}) track_id = track_data.get("id") relationships = track_data.get("relationships", {}) # Get album name from cache album_data = relationships.get("albums", {}).get("data", []) album_id = album_data[0].get("id") if album_data else None album = album_cache.get(album_id, "Unknown Album") # Get artist names from cache artist_data_list = relationships.get("artists", {}).get("data", []) artists = [ artist_cache.get(artist_ref.get("id"), "Unknown Artist") for artist_ref in artist_data_list if artist_ref.get("id") ] if not artists: artists = ["Unknown Artist"] # Parse duration (ISO-8601 format like 'PT3M36S') duration_ms = self._parse_iso_duration(attrs.get('duration', '')) # Append version info (e.g. "BMotion Remix") to title if present track_title = attrs.get('title', 'Unknown Track') track_version = attrs.get('version') or '' if track_version and track_version.lower() not in track_title.lower(): track_title = f"{track_title} ({track_version})" hydrated_tracks.append(Track( id=str(track_id), name=track_title, artists=artists, album=album, duration_ms=duration_ms, external_urls={'tidal': f"https://listen.tidal.com/track/{track_id}"}, explicit=attrs.get('explicit', False) )) return hydrated_tracks except requests.exceptions.HTTPError: raise # Let HTTP errors (429, 503, etc.) propagate to rate_limited decorator for retry except Exception as e: logger.error(f"Error getting tracks batch: {e}") return [] def _parse_iso_duration(self, duration: str) -> int: """Convert ISO-8601 duration string (e.g., 'PT3M36S' or 'PT1H30M45S') to milliseconds""" if not duration or not duration.startswith("PT"): return 0 total_seconds = 0 # Extract hours, minutes, and seconds using regex hours_match = re.search(r"(\d+)H", duration) minutes_match = re.search(r"(\d+)M", duration) seconds_match = re.search(r"(\d+)S", duration) if hours_match: total_seconds += int(hours_match.group(1)) * 3600 if minutes_match: total_seconds += int(minutes_match.group(1)) * 60 if seconds_match: total_seconds += int(seconds_match.group(1)) return total_seconds * 1000 def _parse_track_data(self, item: Dict[str, Any]) -> Optional[Track]: """Parse Tidal track data into Track object""" try: track_id = item.get('id') if not track_id: return None # Extract artist names artists = [] if 'artists' in item: artists = [artist.get('name', 'Unknown') for artist in item['artists']] elif 'artist' in item: artists = [item['artist'].get('name', 'Unknown')] # Append version info (e.g. "Bloom remix") to title if present track_title = item.get('title', 'Unknown Track') track_version = item.get('version') or '' if track_version and track_version.lower() not in track_title.lower(): track_title = f"{track_title} ({track_version})" track = Track( id=str(track_id), name=track_title, artists=artists, album=item.get('album', {}).get('title', 'Unknown Album'), duration_ms=item.get('duration', 0) * 1000, # Convert seconds to ms external_urls={'tidal': f"https://tidal.com/browse/track/{track_id}"}, explicit=item.get('explicit', False) ) return track except Exception as e: logger.error(f"Error parsing Tidal track data: {e}") return None def get_user_info(self) -> Optional[Dict[str, Any]]: """Get current user information""" try: if not self._ensure_valid_token(): logger.error("Not authenticated with Tidal") return None return { 'display_name': 'Tidal User', 'id': 'tidal_user', 'type': 'user' } except Exception as e: logger.error(f"Error getting Tidal user info: {e}") return None # `get_favorite_artists` and `get_favorite_albums` were defined here # against the legacy `/v2/favorites?filter[type]=...` endpoint with a # V1 fallback. Both paths are dead in 2026: V2 returns 404 for # personal favorites (it's scoped to third-party-app-created # collections only), and V1 returns 403 because modern OAuth tokens # carry `collection.read` instead of the legacy `r_usr` scope V1 # demands. Replaced by the V2 user-collection endpoints below — see # the "Favorited albums + artists" section near the end of this class. # ------------------------------------------------------------------ # User Collection ("Favorite Tracks" — Tidal calls this "My Collection") # ------------------------------------------------------------------ # # Tidal V2 exposes the user's favorited tracks via a separate # cursor-paginated endpoint: # # GET /v2/userCollectionTracks/me/relationships/items # ?countryCode=US&locale=en-US&include=items # # Each page returns up to 20 entries in `data[]` (track refs with # `id` + `type='tracks'` + `meta.addedAt`) and an OPTIONAL `links.next` # URL for the next page. The included track resources only carry the # track-level attributes (title, isrc, duration, mediaTags) — artists # and album NAMES come back as relationship-link stubs only, not # embedded data. # # We split the work in two phases: # 1) `_iter_collection_track_ids()` — enumerates every collection # entry by following the cursor chain. Cheap (just IDs + types). # 2) Hydration — feeds the IDs through the existing # `_get_tracks_batch` helper which already knows how to # `include=artists,albums` and produce fully-populated `Track` # objects matching the rest of the codebase. No new parsing, # no new dataclass shape, no duplication of the JSON:API parse. # # Reference: https://github.com/Nezreka/SoulSync/issues/502 — reporter # Yug1900 located the working endpoint after the prior `/v2/favorites` # filter approach returned empty data for personal favorites. # # Auth state — calling code (web_server.py listing endpoint) needs # to know whether an empty result means "user has no favorites" or # "token lacks `collection.read` scope and needs reconnect". The # iter helper sets `self._collection_needs_reconnect = True` when # the endpoint returns 401/403 so the listing endpoint can surface # a user-actionable hint instead of silently hiding the row. _COLLECTION_TRACKS_PATH = "userCollectionTracks/me/relationships/items" _COLLECTION_ALBUMS_PATH = "userCollectionAlbums/me/relationships/items" _COLLECTION_ARTISTS_PATH = "userCollectionArtists/me/relationships/items" _COLLECTION_BATCH_SIZE = 20 # Tidal `filter[id]` page cap @rate_limited def _iter_collection_resource_ids(self, path: str, expected_type: str, max_ids: Optional[int] = None) -> List[str]: """Walk a cursor-paginated collection endpoint and return the list of resource IDs (tracks / albums / artists). Generic across all three favorited-resource endpoints — the only differences between them are the path segment and the ``type`` field on each ``data[]`` entry. Pagination, auth, scope-failure detection, and the diagnostic logging are identical. ``max_ids`` caps the walk early. Returns ``[]`` when not authenticated or when the endpoint refuses (e.g. token without ``collection.read`` scope). On 401/403 also flips ``self._collection_needs_reconnect = True`` so the caller can distinguish 'empty collection' from 'missing scope'.""" # Reset on every call so a successful walk clears any stale flag # left from a prior failed attempt. self._collection_needs_reconnect = False if not self._ensure_valid_token(): logger.debug("Tidal not authenticated — cannot fetch collection %s", expected_type) return [] ids: List[str] = [] next_path: Optional[str] = None while True: if next_path: # `links.next` from Tidal is path-relative to /v2 root, # already carries every query param we need (cursor + countryCode + locale + include). url = next_path if next_path.startswith('http') else f"https://openapi.tidal.com/v2{next_path}" params = None else: url = f"{self.base_url}/{path}" params = { 'countryCode': 'US', 'locale': 'en-US', 'include': 'items', } try: headers = { 'accept': 'application/vnd.api+json', 'Authorization': f'Bearer {self.access_token}', } logger.info( f"Tidal collection request: GET {url} (params={params})" ) resp = requests.get(url, params=params, headers=headers, timeout=15) logger.info( f"Tidal collection response: status={resp.status_code} " f"body[:300]={resp.text[:300]!r}" ) except Exception as e: logger.warning(f"Tidal collection page request failed: {e}") break if resp.status_code != 200: # 401/403 = scope/permission issue. Token predates the # `collection.read` scope expansion or the user revoked # it — flag for the UI hint and bail. if resp.status_code in (401, 403): self._collection_needs_reconnect = True logger.info( "Tidal collection endpoint returned %s — reconnect Tidal " "in Settings → Connections to grant `collection.read` scope.", resp.status_code, ) else: logger.warning( f"Tidal collection page returned {resp.status_code}: {resp.text[:500]}" ) break try: data = resp.json() except ValueError as e: logger.debug(f"Tidal collection response not JSON: {e}") break for item in data.get('data', []): if item.get('type') != expected_type: continue rid = item.get('id') if rid: ids.append(str(rid)) if max_ids is not None and len(ids) >= max_ids: return ids next_path = data.get('links', {}).get('next') if not next_path: break time.sleep(0.3) # Cursor pagination courtesy delay return ids def _iter_collection_track_ids(self, max_ids: Optional[int] = None) -> List[str]: """Favorited tracks — thin wrapper over the generic walker.""" return self._iter_collection_resource_ids( self._COLLECTION_TRACKS_PATH, 'tracks', max_ids, ) def _iter_collection_album_ids(self, max_ids: Optional[int] = None) -> List[str]: """Favorited albums — thin wrapper over the generic walker.""" return self._iter_collection_resource_ids( self._COLLECTION_ALBUMS_PATH, 'albums', max_ids, ) def _iter_collection_artist_ids(self, max_ids: Optional[int] = None) -> List[str]: """Favorited artists — thin wrapper over the generic walker.""" return self._iter_collection_resource_ids( self._COLLECTION_ARTISTS_PATH, 'artists', max_ids, ) def collection_needs_reconnect(self) -> bool: """True when the most recent collection fetch hit a 401/403 — i.e. the saved token doesn't have `collection.read` scope and the user needs to reconnect Tidal in Settings → Connections. Reset to False at the start of every `_iter_collection_track_ids` call so a successful walk clears stale flags.""" return getattr(self, '_collection_needs_reconnect', False) def get_collection_tracks_count(self) -> int: """Total count of tracks in the user's Favorite Tracks. Tidal's cursor pagination doesn't expose a `meta.total` so we walk the full ID list. Cheap relative to hydration (one small request per 20 tracks, no per-track lookups), but linear in collection size — call sparingly.""" try: return len(self._iter_collection_track_ids()) except Exception as e: logger.debug(f"Failed to get Tidal collection tracks count: {e}") return 0 def get_collection_tracks(self, limit: Optional[int] = None) -> List[Track]: """Fetch user's favorited tracks with full artist + album metadata hydrated via `_get_tracks_batch`. Returns the same `Track` shape every other Tidal playlist method returns, so the virtual-playlist plumbing in `web_server.py` can reuse the existing serialization path.""" try: track_ids = self._iter_collection_track_ids(max_ids=limit) if not track_ids: return [] hydrated: List[Track] = [] for i in range(0, len(track_ids), self._COLLECTION_BATCH_SIZE): batch = track_ids[i:i + self._COLLECTION_BATCH_SIZE] try: hydrated.extend(self._get_tracks_batch(batch)) except Exception as e: logger.debug( f"Tidal collection batch hydration failed for IDs {batch}: {e}" ) logger.info( f"Retrieved {len(hydrated)}/{len(track_ids)} tracks from Tidal Favorite Tracks" ) return hydrated except Exception as e: logger.error(f"Error fetching Tidal collection tracks: {e}") return [] # ------------------------------------------------------------------ # Favorited albums + artists — V2 collection endpoints # ------------------------------------------------------------------ # # Same problem the tracks side hit on issue #502: the prior # `/v2/favorites?filter[type]=ALBUMS|ARTISTS` endpoints are # deprecated (404) and the V1 fallback (`/v1/users//favorites/ # albums|artists`) returns 403 because modern OAuth tokens with # `collection.read` scope don't have the legacy `r_usr` scope V1 # requires. Discord-reported symptom: Discover → Your Albums (and # Your Artists) section shows nothing for Tidal users regardless # of how many albums/artists they've favorited. # # Fix mirrors the tracks path: # 1) Cursor-walk `/v2/userCollection{Albums|Artists}/me/relationships/items` # via `_iter_collection_album_ids` / `_iter_collection_artist_ids` # (lifted into the generic `_iter_collection_resource_ids` helper). # 2) Batch-hydrate via `/v2/{albums|artists}?filter[id]=...&include=...` # with single-request fan-out (artists+coverArt for albums, # profileArt for artists). Parses JSON:API `included[]` for # artist names + image URLs. # # Public surface preserves the existing return shape — list of # dicts matching what `database.upsert_liked_album` / # `upsert_liked_artist` consume — so web_server.py callers # (`/api/discover/your-albums-fetch` and equivalent) stay # byte-identical. @rate_limited def _get_albums_batch(self, album_ids: List[str]) -> List[Dict[str, Any]]: """Batch-fetch album metadata + cover art + artist names in one request via JSON:API extended-include semantics. Returns list of dicts matching `database.upsert_liked_album` kwargs.""" if not album_ids: return [] try: params = { 'countryCode': 'US', 'include': 'artists,coverArt', 'filter[id]': ','.join(album_ids), } headers = {'accept': 'application/vnd.api+json'} resp = self.session.get( f"{self.base_url}/albums", params=params, headers=headers, timeout=15, ) if resp.status_code != 200: logger.debug( f"Tidal albums batch returned {resp.status_code}: {resp.text[:200]}" ) return [] data = resp.json() artists_by_id, artworks_by_id = self._build_included_maps(data.get('included', [])) results: List[Dict[str, Any]] = [] for item in data.get('data', []): if item.get('type') != 'albums': continue attrs = item.get('attributes', {}) rels = item.get('relationships', {}) results.append({ 'tidal_id': str(item.get('id', '')), 'album_name': attrs.get('title', '') or '', 'artist_name': self._first_artist_name(rels, artists_by_id), 'image_url': self._first_artwork_url(rels.get('coverArt', {}), artworks_by_id), 'release_date': attrs.get('releaseDate', '') or '', 'total_tracks': int(attrs.get('numberOfItems') or 0), }) return results except Exception as e: logger.debug(f"Tidal _get_albums_batch error: {e}") return [] @rate_limited def _get_artists_batch(self, artist_ids: List[str]) -> List[Dict[str, Any]]: """Batch-fetch artist metadata + profile image. Returns list of dicts matching the prior `get_favorite_artists` shape (`tidal_id`, `name`, `image_url`).""" if not artist_ids: return [] try: params = { 'countryCode': 'US', 'include': 'profileArt', 'filter[id]': ','.join(artist_ids), } headers = {'accept': 'application/vnd.api+json'} resp = self.session.get( f"{self.base_url}/artists", params=params, headers=headers, timeout=15, ) if resp.status_code != 200: logger.debug( f"Tidal artists batch returned {resp.status_code}: {resp.text[:200]}" ) return [] data = resp.json() _, artworks_by_id = self._build_included_maps(data.get('included', [])) results: List[Dict[str, Any]] = [] for item in data.get('data', []): if item.get('type') != 'artists': continue attrs = item.get('attributes', {}) rels = item.get('relationships', {}) results.append({ 'tidal_id': str(item.get('id', '')), 'name': attrs.get('name', '') or '', 'image_url': self._first_artwork_url(rels.get('profileArt', {}), artworks_by_id), }) return results except Exception as e: logger.debug(f"Tidal _get_artists_batch error: {e}") return [] @staticmethod def _build_included_maps(included: List[Dict[str, Any]]): """Index a JSON:API `included[]` array by resource type so the per-resource lookup in batch-hydrate is O(1) per relationship ref rather than O(n).""" artists_by_id: Dict[str, Dict[str, Any]] = {} artworks_by_id: Dict[str, Dict[str, Any]] = {} for inc in included: inc_id = str(inc.get('id', '')) if not inc_id: continue inc_type = inc.get('type') if inc_type == 'artists': artists_by_id[inc_id] = inc elif inc_type == 'artworks': artworks_by_id[inc_id] = inc return artists_by_id, artworks_by_id @staticmethod def _first_artist_name(relationships: Dict[str, Any], artists_by_id: Dict[str, Dict[str, Any]]) -> str: """Resolve the primary artist name from a relationships block + included-artists map. Returns '' if not resolvable so the upsert path doesn't trip on None.""" artist_refs = relationships.get('artists', {}).get('data', []) if not artist_refs: return '' first_id = str(artist_refs[0].get('id', '')) artist_obj = artists_by_id.get(first_id, {}) return artist_obj.get('attributes', {}).get('name', '') or '' @staticmethod def _first_artwork_url(artwork_relationship: Dict[str, Any], artworks_by_id: Dict[str, Dict[str, Any]]) -> Optional[str]: """Resolve the largest cover/profile image URL from an artwork relationship + included-artworks map. Tidal returns files largest-first so picking files[0] gets the highest-resolution variant (typically 1280×1280).""" refs = artwork_relationship.get('data', []) if not refs: return None first_id = str(refs[0].get('id', '')) artwork = artworks_by_id.get(first_id, {}) files = artwork.get('attributes', {}).get('files', []) if not files: return None return files[0].get('href') def get_favorite_albums(self, limit: int = 200) -> List[Dict[str, Any]]: """Fetch user's favorited albums via the V2 user-collection endpoint. Replaces the prior `/v2/favorites` + V1-fallback path which is now dead (V2 endpoint deprecated, V1 returns 403 for modern OAuth tokens lacking `r_usr` scope). Returns list of dicts matching `database.upsert_liked_album` kwargs — the discover.py 'Your Albums' aggregator iterates these and writes them to the liked_albums table.""" try: album_ids = self._iter_collection_album_ids(max_ids=limit) if not album_ids: return [] results: List[Dict[str, Any]] = [] for i in range(0, len(album_ids), self._COLLECTION_BATCH_SIZE): batch = album_ids[i:i + self._COLLECTION_BATCH_SIZE] results.extend(self._get_albums_batch(batch)) logger.info( f"Retrieved {len(results)}/{len(album_ids)} favorite albums from Tidal" ) return results except Exception as e: logger.error(f"Error fetching Tidal favorite albums: {e}") return [] def get_favorite_artists(self, limit: int = 200) -> List[Dict[str, Any]]: """Fetch user's favorited artists via the V2 user-collection endpoint. Replaces the prior `/v2/favorites` + V1-fallback path (dead for the same reason as `get_favorite_albums`). Returns list of dicts matching the prior shape (`tidal_id`, `name`, `image_url`) so web_server.py's `/api/discover/ your-artists-fetch` aggregator path stays byte-identical.""" try: artist_ids = self._iter_collection_artist_ids(max_ids=limit) if not artist_ids: return [] results: List[Dict[str, Any]] = [] for i in range(0, len(artist_ids), self._COLLECTION_BATCH_SIZE): batch = artist_ids[i:i + self._COLLECTION_BATCH_SIZE] results.extend(self._get_artists_batch(batch)) logger.info( f"Retrieved {len(results)}/{len(artist_ids)} favorite artists from Tidal" ) return results except Exception as e: logger.error(f"Error fetching Tidal favorite artists: {e}") return [] # Global instance _tidal_client = None def get_tidal_client() -> TidalClient: """Get global Tidal client instance""" global _tidal_client if _tidal_client is None: _tidal_client = TidalClient() return _tidal_client