From 665281f184095a3bb7aa1e5e2ff182f0533b6112 Mon Sep 17 00:00:00 2001 From: Broque Thomas Date: Thu, 15 Jan 2026 06:44:35 -0800 Subject: [PATCH] Refactor playlist fetching to use Tidal JSON:API Rewrites playlist and track fetching to use Tidal's JSON:API endpoints with cursor-based pagination and batch track hydration. Adds robust rate limiting with retry logic, and introduces ISO-8601 duration parsing. This improves reliability, performance, and compatibility with Tidal's latest API structure. --- core/tidal_client.py | 309 +++++++++++++++++++++++++++++++------------ 1 file changed, 223 insertions(+), 86 deletions(-) diff --git a/core/tidal_client.py b/core/tidal_client.py index 052b87e2..4890b925 100644 --- a/core/tidal_client.py +++ b/core/tidal_client.py @@ -1,5 +1,6 @@ import requests import time +import re import threading from typing import Dict, List, Optional, Any from functools import wraps @@ -23,33 +24,49 @@ _api_call_lock = threading.Lock() MIN_API_INTERVAL = 0.2 # 200ms between API calls def rate_limited(func): - """Decorator to enforce rate limiting on Tidal API calls""" + """Decorator to enforce rate limiting on Tidal API calls with retry logic""" @wraps(func) def wrapper(*args, **kwargs): - global _last_api_call_time - - with _api_call_lock: - current_time = time.time() - time_since_last_call = current_time - _last_api_call_time - - if time_since_last_call < MIN_API_INTERVAL: - sleep_time = MIN_API_INTERVAL - time_since_last_call - time.sleep(sleep_time) - - _last_api_call_time = time.time() - - try: - result = func(*args, **kwargs) - return result - except Exception as e: - # Implement exponential backoff for API errors - if "rate limit" in str(e).lower() or "429" in str(e): - logger.warning(f"Rate limit hit, implementing backoff: {e}") - time.sleep(3.0) # Wait 3 seconds before retrying - elif "503" in str(e) or "502" in str(e): - logger.warning(f"Tidal service error, backing off: {e}") - time.sleep(2.0) # Wait 2 seconds for service errors - raise + max_retries = 3 + last_exception = None + + for attempt in range(max_retries): + global _last_api_call_time + + with _api_call_lock: + current_time = time.time() + time_since_last_call = current_time - _last_api_call_time + + if time_since_last_call < MIN_API_INTERVAL: + sleep_time = MIN_API_INTERVAL - time_since_last_call + time.sleep(sleep_time) + + _last_api_call_time = time.time() + + try: + result = func(*args, **kwargs) + return result + except Exception as e: + last_exception = e + error_str = str(e) + + # Only retry on specific errors + if "rate limit" in error_str.lower() or "429" in error_str: + logger.warning(f"Rate limit hit on attempt {attempt + 1}/{max_retries}, backing off: {e}") + if attempt < max_retries - 1: + time.sleep(3.0) + continue + elif "503" in error_str or "502" in error_str: + logger.warning(f"Tidal service error on attempt {attempt + 1}/{max_retries}, backing off: {e}") + if attempt < max_retries - 1: + time.sleep(2.0) + continue + + # For other errors, don't retry + raise + + # If we exhausted retries, raise the last exception + raise last_exception return wrapper @dataclass @@ -550,7 +567,7 @@ class TidalClient: endpoint = f"{self.base_url}/playlists" params = { 'countryCode': 'US', - 'filter[r.owners.id]': user_id + 'filter[owners.id]': user_id } headers = self.session.headers.copy() @@ -687,97 +704,217 @@ class TidalClient: @rate_limited def get_playlist(self, playlist_id: str) -> Optional[Playlist]: - """Get playlist details including tracks""" + """Get playlist details including tracks using JSON:API format""" try: if not self._ensure_valid_token(): logger.error("Not authenticated with Tidal") return None - - # Get playlist metadata + + # Get playlist metadata with JSON:API format + headers = {'accept': 'application/vnd.api+json'} response = self.session.get( f"{self.base_url}/playlists/{playlist_id}", params={'countryCode': 'US'}, + headers=headers, timeout=10 ) - + + response.raise_for_status() + if response.status_code != 200: logger.error(f"Failed to get Tidal playlist {playlist_id}: {response.status_code} - {response.text}") return None - - playlist_data = response.json() - # Get playlist tracks with pagination to handle large playlists + # Parse JSON:API response structure + playlist_data = response.json().get("data", {}) + playlist_attrs = playlist_data.get("attributes", {}) + + # Get playlist tracks with cursor-based pagination tracks = [] - offset = 0 - limit = 100 + cursor = None total_fetched = 0 while True: - logger.info(f"Fetching tracks for playlist {playlist_id}: offset={offset}, limit={limit}") - - tracks_response = self.session.get( - f"{self.base_url}/playlists/{playlist_id}/items", - params={'countryCode': 'US', 'limit': limit, 'offset': offset}, - timeout=10 - ) - - if tracks_response.status_code != 200: - logger.error(f"Failed to get Tidal playlist tracks at offset {offset}: {tracks_response.status_code} - {tracks_response.text}") - break - - tracks_data = tracks_response.json() - - if 'items' not in tracks_data: - logger.warning(f"No items found in playlist {playlist_id} response at offset {offset}") - break + # Fetch a page of track IDs + tracks_page = self._get_playlist_tracks_page(playlist_id, cursor) - items = tracks_data['items'] - if len(items) == 0: - logger.info(f"No more tracks found at offset {offset}, stopping pagination") + if not tracks_page or not tracks_page.get("data"): + logger.info(f"No more tracks found, stopping pagination") break - # Process this batch of tracks - batch_count = 0 - for item in items: - # Handle different item structures - track_data = item - if 'item' in item and item.get('type') == 'track': - track_data = item['item'] - elif 'resource' in item: - track_data = item['resource'] - - track = self._parse_track_data(track_data) - if track: - tracks.append(track) - batch_count += 1 - - total_fetched += batch_count - logger.info(f"Fetched {batch_count} tracks in this batch, {total_fetched} total so far") - - # If we got fewer items than the limit, we've reached the end - if len(items) < limit: - logger.info(f"Received {len(items)} items (less than limit {limit}), pagination complete") + # Extract track IDs from this page + track_ids = [] + for item in tracks_page.get("data", []): + # In JSON:API, relationship items have both 'type' and 'id' + # The type should be 'tracks' but we'll be defensive + if item.get("type") and item.get("id"): + track_ids.append(item.get("id")) + + if track_ids: + # Batch fetch full track details with artists and albums + batch_tracks = self._get_tracks_batch(track_ids) + tracks.extend(batch_tracks) + total_fetched += len(batch_tracks) + logger.info(f"Fetched {len(batch_tracks)} tracks in this batch, {total_fetched} total so far") + + # Get next cursor from meta (not nested in links) + cursor = tracks_page.get("meta", {}).get("nextCursor") + + # If no next cursor exists, we're done + if not cursor: + logger.info("No next cursor found, pagination complete") break - # Move to next page - offset += limit - playlist = Playlist( id=playlist_data.get('id', playlist_id), - name=playlist_data.get('title', 'Unknown Playlist'), - description=playlist_data.get('description', ''), + name=playlist_attrs.get('name', 'Unknown Playlist'), + description=playlist_attrs.get('description', ''), tracks=tracks, - external_urls={'tidal': f"https://tidal.com/browse/playlist/{playlist_id}"}, - public=not playlist_data.get('publicPlaylist', True) # Tidal uses 'publicPlaylist' field + external_urls={'tidal': f"https://listen.tidal.com/playlist/{playlist_id}"}, + public=playlist_attrs.get('accessType', '') == "PUBLIC" ) - + logger.info(f"Retrieved Tidal playlist '{playlist.name}' with {len(tracks)} tracks") return playlist - + except Exception as e: logger.error(f"Error getting Tidal playlist {playlist_id}: {e}") return None - + + @rate_limited + def _get_playlist_tracks_page(self, playlist_id: str, cursor: Optional[str] = None) -> Optional[Dict[str, Any]]: + """Fetch a page of track IDs from a playlist using cursor-based pagination""" + try: + params = {"countryCode": "US"} + if cursor: + params["page[cursor]"] = cursor + + headers = {'accept': 'application/vnd.api+json'} + response = self.session.get( + f"{self.base_url}/playlists/{playlist_id}/relationships/items", + params=params, + headers=headers, + timeout=10 + ) + + response.raise_for_status() + + if response.status_code != 200: + logger.error(f"Failed to get playlist tracks page: {response.status_code} - {response.text}") + return None + + return response.json() + + except Exception as e: + logger.error(f"Error fetching playlist tracks page: {e}") + return None + + @rate_limited + def _get_tracks_batch(self, track_ids: List[str]) -> List[Track]: + """Batch fetch track details with artists and albums included""" + try: + if not track_ids: + return [] + + params = { + "countryCode": "US", + "include": "artists,albums", + "filter[id]": ",".join(track_ids) + } + + headers = {'accept': 'application/vnd.api+json'} + response = self.session.get( + f"{self.base_url}/tracks", + params=params, + headers=headers, + timeout=10 + ) + + response.raise_for_status() + + if response.status_code != 200: + logger.error(f"Failed to get tracks batch: {response.status_code} - {response.text}") + return [] + + tracks_data = response.json() + + # Build lookup caches for albums and artists from included data + album_cache: Dict[str, str] = {} + artist_cache: Dict[str, str] = {} + + for item in tracks_data.get("included", []): + item_id = item.get("id") + item_type = item.get("type") + + if item_type == "albums": + album_cache[item_id] = item.get("attributes", {}).get("title", "Unknown Album") + elif item_type == "artists": + artist_cache[item_id] = item.get("attributes", {}).get("name", "Unknown Artist") + + # Parse tracks and hydrate with artist/album data + hydrated_tracks: List[Track] = [] + + for track_data in tracks_data.get("data", []): + attrs = track_data.get("attributes", {}) + track_id = track_data.get("id") + relationships = track_data.get("relationships", {}) + + # Get album name from cache + album_data = relationships.get("albums", {}).get("data", []) + album_id = album_data[0].get("id") if album_data else None + album = album_cache.get(album_id, "Unknown Album") + + # Get artist names from cache + artist_data_list = relationships.get("artists", {}).get("data", []) + artists = [ + artist_cache.get(artist_ref.get("id"), "Unknown Artist") + for artist_ref in artist_data_list + if artist_ref.get("id") + ] + + if not artists: + artists = ["Unknown Artist"] + + # Parse duration (ISO-8601 format like 'PT3M36S') + duration_ms = self._parse_iso_duration(attrs.get('duration', '')) + + hydrated_tracks.append(Track( + id=str(track_id), + name=attrs.get('title', 'Unknown Track'), + artists=artists, + album=album, + duration_ms=duration_ms, + external_urls={'tidal': f"https://listen.tidal.com/track/{track_id}"}, + explicit=attrs.get('explicit', False) + )) + + return hydrated_tracks + + except Exception as e: + logger.error(f"Error getting tracks batch: {e}") + return [] + + def _parse_iso_duration(self, duration: str) -> int: + """Convert ISO-8601 duration string (e.g., 'PT3M36S' or 'PT1H30M45S') to milliseconds""" + if not duration or not duration.startswith("PT"): + return 0 + + total_seconds = 0 + + # Extract hours, minutes, and seconds using regex + hours_match = re.search(r"(\d+)H", duration) + minutes_match = re.search(r"(\d+)M", duration) + seconds_match = re.search(r"(\d+)S", duration) + + if hours_match: + total_seconds += int(hours_match.group(1)) * 3600 + if minutes_match: + total_seconds += int(minutes_match.group(1)) * 60 + if seconds_match: + total_seconds += int(seconds_match.group(1)) + + return total_seconds * 1000 + def _parse_track_data(self, item: Dict[str, Any]) -> Optional[Track]: """Parse Tidal track data into Track object""" try: