mirror of https://github.com/Nezreka/SoulSync.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
800 lines
31 KiB
800 lines
31 KiB
import spotipy
|
|
from spotipy.oauth2 import SpotifyOAuth, SpotifyClientCredentials
|
|
from typing import Dict, List, Optional, Any
|
|
import time
|
|
import threading
|
|
from functools import wraps
|
|
from dataclasses import dataclass
|
|
from utils.logging_config import get_logger
|
|
from config.settings import config_manager
|
|
|
|
logger = get_logger("spotify_client")
|
|
|
|
# Global rate limiting variables
|
|
_last_api_call_time = 0
|
|
_api_call_lock = threading.Lock()
|
|
MIN_API_INTERVAL = 0.2 # 200ms between API calls (more conservative to avoid bans)
|
|
|
|
# Request queuing for burst handling
|
|
import queue
|
|
_request_queue = queue.Queue()
|
|
_queue_processor_running = False
|
|
|
|
def rate_limited(func):
|
|
"""Decorator to enforce rate limiting on Spotify API calls with retry and exponential backoff"""
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
global _last_api_call_time
|
|
|
|
max_retries = 5
|
|
|
|
for attempt in range(max_retries + 1):
|
|
# Enforce minimum interval between API calls
|
|
with _api_call_lock:
|
|
current_time = time.time()
|
|
time_since_last_call = current_time - _last_api_call_time
|
|
|
|
if time_since_last_call < MIN_API_INTERVAL:
|
|
sleep_time = MIN_API_INTERVAL - time_since_last_call
|
|
time.sleep(sleep_time)
|
|
|
|
_last_api_call_time = time.time()
|
|
|
|
try:
|
|
return func(*args, **kwargs)
|
|
except Exception as e:
|
|
error_str = str(e).lower()
|
|
is_rate_limit = "rate limit" in error_str or "429" in str(e)
|
|
is_server_error = "502" in str(e) or "503" in str(e)
|
|
|
|
if is_rate_limit and attempt < max_retries:
|
|
# Try to extract Retry-After from spotipy exception headers
|
|
retry_after = None
|
|
if hasattr(e, 'headers') and e.headers:
|
|
retry_after = e.headers.get('Retry-After') or e.headers.get('retry-after')
|
|
|
|
if retry_after:
|
|
try:
|
|
delay = int(retry_after) + 1
|
|
except (ValueError, TypeError):
|
|
delay = 3.0 * (2 ** attempt)
|
|
else:
|
|
delay = 3.0 * (2 ** attempt) # 3, 6, 12, 24, 48
|
|
|
|
logger.warning(f"Spotify rate limit hit, retrying in {delay:.0f}s (attempt {attempt + 1}/{max_retries}): {func.__name__}")
|
|
time.sleep(delay)
|
|
continue
|
|
|
|
elif is_server_error and attempt < max_retries:
|
|
delay = 2.0 * (2 ** attempt) # 2, 4, 8, 16, 32
|
|
logger.warning(f"Spotify server error, retrying in {delay:.0f}s (attempt {attempt + 1}/{max_retries}): {func.__name__}")
|
|
time.sleep(delay)
|
|
continue
|
|
|
|
raise
|
|
return wrapper
|
|
|
|
@dataclass
|
|
class Track:
|
|
id: str
|
|
name: str
|
|
artists: List[str]
|
|
album: str
|
|
duration_ms: int
|
|
popularity: int
|
|
preview_url: Optional[str] = None
|
|
external_urls: Optional[Dict[str, str]] = None
|
|
image_url: Optional[str] = None
|
|
|
|
@classmethod
|
|
def from_spotify_track(cls, track_data: Dict[str, Any]) -> 'Track':
|
|
# Extract album image (medium size preferred)
|
|
album_image_url = None
|
|
if 'album' in track_data and 'images' in track_data['album']:
|
|
images = track_data['album']['images']
|
|
if images:
|
|
# Get medium size image (usually index 1), or largest if not available
|
|
album_image_url = images[1]['url'] if len(images) > 1 else images[0]['url']
|
|
|
|
return cls(
|
|
id=track_data['id'],
|
|
name=track_data['name'],
|
|
artists=[artist['name'] for artist in track_data['artists']],
|
|
album=track_data['album']['name'],
|
|
duration_ms=track_data['duration_ms'],
|
|
popularity=track_data.get('popularity', 0),
|
|
preview_url=track_data.get('preview_url'),
|
|
external_urls=track_data.get('external_urls'),
|
|
image_url=album_image_url
|
|
)
|
|
|
|
@dataclass
|
|
class Artist:
|
|
id: str
|
|
name: str
|
|
popularity: int
|
|
genres: List[str]
|
|
followers: int
|
|
image_url: Optional[str] = None
|
|
external_urls: Optional[Dict[str, str]] = None
|
|
|
|
@classmethod
|
|
def from_spotify_artist(cls, artist_data: Dict[str, Any]) -> 'Artist':
|
|
# Get the largest image URL if available
|
|
image_url = None
|
|
if artist_data.get('images') and len(artist_data['images']) > 0:
|
|
image_url = artist_data['images'][0]['url']
|
|
|
|
return cls(
|
|
id=artist_data['id'],
|
|
name=artist_data['name'],
|
|
popularity=artist_data.get('popularity', 0),
|
|
genres=artist_data.get('genres', []),
|
|
followers=artist_data.get('followers', {}).get('total', 0),
|
|
image_url=image_url,
|
|
external_urls=artist_data.get('external_urls')
|
|
)
|
|
|
|
@dataclass
|
|
class Album:
|
|
id: str
|
|
name: str
|
|
artists: List[str]
|
|
release_date: str
|
|
total_tracks: int
|
|
album_type: str
|
|
image_url: Optional[str] = None
|
|
external_urls: Optional[Dict[str, str]] = None
|
|
artist_ids: Optional[List[str]] = None
|
|
|
|
@classmethod
|
|
def from_spotify_album(cls, album_data: Dict[str, Any]) -> 'Album':
|
|
# Get the largest image URL if available
|
|
image_url = None
|
|
if album_data.get('images') and len(album_data['images']) > 0:
|
|
image_url = album_data['images'][0]['url']
|
|
|
|
return cls(
|
|
id=album_data['id'],
|
|
name=album_data['name'],
|
|
artists=[artist['name'] for artist in album_data['artists']],
|
|
release_date=album_data.get('release_date', ''),
|
|
total_tracks=album_data.get('total_tracks', 0),
|
|
album_type=album_data.get('album_type', 'album'),
|
|
image_url=image_url,
|
|
external_urls=album_data.get('external_urls'),
|
|
artist_ids=[artist['id'] for artist in album_data['artists']]
|
|
)
|
|
|
|
@dataclass
|
|
class Playlist:
|
|
id: str
|
|
name: str
|
|
description: Optional[str]
|
|
owner: str
|
|
public: bool
|
|
collaborative: bool
|
|
tracks: List[Track]
|
|
total_tracks: int
|
|
|
|
@classmethod
|
|
def from_spotify_playlist(cls, playlist_data: Dict[str, Any], tracks: List[Track]) -> 'Playlist':
|
|
return cls(
|
|
id=playlist_data['id'],
|
|
name=playlist_data['name'],
|
|
description=playlist_data.get('description'),
|
|
owner=playlist_data['owner']['display_name'],
|
|
public=playlist_data['public'],
|
|
collaborative=playlist_data['collaborative'],
|
|
tracks=tracks,
|
|
total_tracks=(playlist_data.get('tracks') or playlist_data.get('items') or {}).get('total', 0)
|
|
)
|
|
|
|
class SpotifyClient:
|
|
def __init__(self):
|
|
self.sp: Optional[spotipy.Spotify] = None
|
|
self.user_id: Optional[str] = None
|
|
self._itunes_client = None # Lazy-loaded iTunes fallback
|
|
self._auth_cache_lock = threading.Lock()
|
|
self._auth_cached_result: Optional[bool] = None
|
|
self._auth_cache_time: float = 0
|
|
self._AUTH_CACHE_TTL = 60 # seconds
|
|
self._setup_client()
|
|
|
|
def _is_spotify_id(self, id_str: str) -> bool:
|
|
"""Check if an ID is a Spotify ID (alphanumeric) vs iTunes ID (numeric only)"""
|
|
if not id_str:
|
|
return False
|
|
# Spotify IDs contain letters and numbers, iTunes IDs are purely numeric
|
|
return not id_str.isdigit()
|
|
|
|
def _is_itunes_id(self, id_str: str) -> bool:
|
|
"""Check if an ID is an iTunes ID (numeric only)"""
|
|
if not id_str:
|
|
return False
|
|
return id_str.isdigit()
|
|
|
|
@property
|
|
def _itunes(self):
|
|
"""Lazy-load iTunes client for fallback when Spotify not authenticated"""
|
|
if self._itunes_client is None:
|
|
from core.itunes_client import iTunesClient
|
|
self._itunes_client = iTunesClient()
|
|
logger.info("iTunes fallback client initialized")
|
|
return self._itunes_client
|
|
|
|
def reload_config(self):
|
|
"""Reload configuration and re-initialize client"""
|
|
self._invalidate_auth_cache()
|
|
self._setup_client()
|
|
|
|
def _setup_client(self):
|
|
config = config_manager.get_spotify_config()
|
|
|
|
if not config.get('client_id') or not config.get('client_secret'):
|
|
logger.warning("Spotify credentials not configured")
|
|
return
|
|
|
|
try:
|
|
auth_manager = SpotifyOAuth(
|
|
client_id=config['client_id'],
|
|
client_secret=config['client_secret'],
|
|
redirect_uri=config.get('redirect_uri', "http://127.0.0.1:8888/callback"),
|
|
scope="user-library-read user-read-private playlist-read-private playlist-read-collaborative user-read-email",
|
|
cache_path='config/.spotify_cache'
|
|
)
|
|
|
|
self.sp = spotipy.Spotify(auth_manager=auth_manager)
|
|
# Don't fetch user info on startup - do it lazily to avoid blocking UI
|
|
self.user_id = None
|
|
logger.info("Spotify client initialized (user info will be fetched when needed)")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to authenticate with Spotify: {e}")
|
|
self.sp = None
|
|
|
|
def is_authenticated(self) -> bool:
|
|
"""
|
|
Check if client can service metadata requests.
|
|
Returns True if Spotify is authenticated OR iTunes fallback is available.
|
|
For Spotify-specific auth check, use is_spotify_authenticated().
|
|
"""
|
|
# If Spotify is authenticated, we're good
|
|
if self.is_spotify_authenticated():
|
|
return True
|
|
|
|
# iTunes fallback is always available
|
|
return True
|
|
|
|
def _invalidate_auth_cache(self):
|
|
"""Clear the auth cache so the next check makes a fresh API call"""
|
|
with self._auth_cache_lock:
|
|
self._auth_cached_result = None
|
|
self._auth_cache_time = 0
|
|
|
|
def is_spotify_authenticated(self) -> bool:
|
|
"""Check if Spotify client is specifically authenticated (not just iTunes fallback).
|
|
Results are cached for 60 seconds to avoid excessive API calls."""
|
|
if self.sp is None:
|
|
return False
|
|
|
|
# Check cache first (lock only for brief read)
|
|
with self._auth_cache_lock:
|
|
if self._auth_cached_result is not None and (time.time() - self._auth_cache_time) < self._AUTH_CACHE_TTL:
|
|
return self._auth_cached_result
|
|
|
|
# Cache miss — make API call outside the lock.
|
|
# Use a no-retry client to avoid spotipy blocking for hours on 429s
|
|
# (Retry-After can be 2+ hours). The main self.sp client keeps its
|
|
# retries for normal API calls.
|
|
try:
|
|
probe = spotipy.Spotify(auth_manager=self.sp.auth_manager, retries=0)
|
|
probe.current_user()
|
|
result = True
|
|
except Exception as e:
|
|
error_str = str(e)
|
|
# Rate limit means we ARE authenticated — just throttled
|
|
if "rate" in error_str.lower() or "429" in error_str:
|
|
logger.warning("Spotify rate limited during auth check — treating as authenticated")
|
|
result = True
|
|
else:
|
|
logger.debug(f"Spotify authentication check failed: {e}")
|
|
result = False
|
|
|
|
with self._auth_cache_lock:
|
|
self._auth_cached_result = result
|
|
self._auth_cache_time = time.time()
|
|
|
|
return result
|
|
|
|
def disconnect(self):
|
|
"""Disconnect Spotify: clear client, delete cache, invalidate auth cache"""
|
|
import os
|
|
self.sp = None
|
|
self.user_id = None
|
|
self._invalidate_auth_cache()
|
|
|
|
cache_path = 'config/.spotify_cache'
|
|
try:
|
|
if os.path.exists(cache_path):
|
|
os.remove(cache_path)
|
|
logger.info("Deleted Spotify cache file")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to delete Spotify cache: {e}")
|
|
|
|
logger.info("Spotify client disconnected")
|
|
|
|
def _ensure_user_id(self) -> bool:
|
|
"""Ensure user_id is loaded (may make API call)"""
|
|
if self.user_id is None and self.sp is not None:
|
|
try:
|
|
user_info = self.sp.current_user()
|
|
self.user_id = user_info['id']
|
|
logger.info(f"Successfully authenticated with Spotify as {user_info['display_name']}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to fetch user info: {e}")
|
|
return False
|
|
return self.user_id is not None
|
|
|
|
@rate_limited
|
|
def get_user_playlists(self) -> List[Playlist]:
|
|
if not self.is_spotify_authenticated():
|
|
logger.error("Not authenticated with Spotify")
|
|
return []
|
|
|
|
if not self._ensure_user_id():
|
|
logger.error("Failed to get user ID")
|
|
return []
|
|
|
|
playlists = []
|
|
|
|
try:
|
|
results = self.sp.current_user_playlists(limit=50)
|
|
|
|
while results:
|
|
for playlist_data in results['items']:
|
|
# Spotify API already returns all playlists the user has access to
|
|
# (owned + followed), so no need to filter
|
|
logger.info(f"Fetching tracks for playlist: {playlist_data['name']}")
|
|
tracks = self._get_playlist_tracks(playlist_data['id'])
|
|
playlist = Playlist.from_spotify_playlist(playlist_data, tracks)
|
|
playlists.append(playlist)
|
|
|
|
results = self.sp.next(results) if results['next'] else None
|
|
|
|
logger.info(f"Retrieved {len(playlists)} playlists")
|
|
return playlists
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching user playlists: {e}")
|
|
return []
|
|
|
|
@rate_limited
|
|
def get_user_playlists_metadata_only(self) -> List[Playlist]:
|
|
"""Get playlists without fetching all track details for faster loading"""
|
|
if not self.is_spotify_authenticated():
|
|
logger.error("Not authenticated with Spotify")
|
|
return []
|
|
|
|
if not self._ensure_user_id():
|
|
logger.error("Failed to get user ID")
|
|
return []
|
|
|
|
playlists = []
|
|
|
|
try:
|
|
# Fetch all playlists using pagination
|
|
limit = 50 # Maximum allowed by Spotify API
|
|
offset = 0
|
|
total_fetched = 0
|
|
|
|
logger.info("Beginning fetch of user playlists...")
|
|
|
|
while True:
|
|
results = self.sp.current_user_playlists(limit=limit, offset=offset)
|
|
|
|
if not results or 'items' not in results:
|
|
break
|
|
|
|
# Log expected total on first page
|
|
if offset == 0:
|
|
expected_total = results.get('total', 'Unknown')
|
|
logger.info(f"Spotify reports {expected_total} total playlists to fetch.")
|
|
|
|
batch_count = 0
|
|
for playlist_data in results['items']:
|
|
try:
|
|
# Spotify API already returns all playlists the user has access to
|
|
# (owned + followed), so no need to filter
|
|
|
|
# Handle potential missing owner data safely
|
|
if not playlist_data.get('owner'):
|
|
playlist_data['owner'] = {'display_name': 'Unknown Owner', 'id': 'unknown'}
|
|
elif not playlist_data['owner'].get('display_name'):
|
|
playlist_data['owner']['display_name'] = 'Unknown'
|
|
|
|
# Create playlist with empty tracks list for now
|
|
playlist = Playlist.from_spotify_playlist(playlist_data, [])
|
|
playlists.append(playlist)
|
|
batch_count += 1
|
|
|
|
except Exception as p_error:
|
|
p_name = playlist_data.get('name', 'Unknown') if playlist_data else 'None'
|
|
logger.warning(f"Skipping malformed playlist '{p_name}': {p_error}")
|
|
|
|
total_fetched += batch_count
|
|
logger.info(f"Retrieved {batch_count} playlists in batch (offset {offset}), total so far: {total_fetched}")
|
|
|
|
# Check if we've fetched all playlists
|
|
if len(results['items']) < limit or not results.get('next'):
|
|
break
|
|
|
|
offset += limit
|
|
|
|
logger.info(f"Retrieved {len(playlists)} total playlist metadata")
|
|
return playlists
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching user playlists metadata: {e}")
|
|
# Return partial results if we crashed mid-way but have some data
|
|
if playlists:
|
|
logger.info(f"Returning {len(playlists)} playlists fetched before error.")
|
|
return playlists
|
|
return []
|
|
|
|
@rate_limited
|
|
def get_saved_tracks_count(self) -> int:
|
|
"""Get the total count of user's saved/liked songs without fetching all tracks"""
|
|
if not self.is_spotify_authenticated():
|
|
logger.error("Not authenticated with Spotify")
|
|
return 0
|
|
|
|
try:
|
|
# Just fetch first page to get the total count
|
|
results = self.sp.current_user_saved_tracks(limit=1)
|
|
if results and 'total' in results:
|
|
total_count = results['total']
|
|
logger.info(f"User has {total_count} saved tracks")
|
|
return total_count
|
|
return 0
|
|
except Exception as e:
|
|
logger.error(f"Error fetching saved tracks count: {e}")
|
|
return 0
|
|
|
|
@rate_limited
|
|
def get_saved_tracks(self) -> List[Track]:
|
|
"""Fetch all user's saved/liked songs from Spotify"""
|
|
if not self.is_spotify_authenticated():
|
|
logger.error("Not authenticated with Spotify")
|
|
return []
|
|
|
|
tracks = []
|
|
|
|
try:
|
|
limit = 50 # Maximum allowed by Spotify API
|
|
offset = 0
|
|
total_fetched = 0
|
|
|
|
while True:
|
|
results = self.sp.current_user_saved_tracks(limit=limit, offset=offset)
|
|
|
|
if not results or 'items' not in results:
|
|
break
|
|
|
|
batch_count = 0
|
|
for item in results['items']:
|
|
if item['track'] and item['track']['id']:
|
|
track = Track.from_spotify_track(item['track'])
|
|
tracks.append(track)
|
|
batch_count += 1
|
|
|
|
total_fetched += batch_count
|
|
logger.info(f"Retrieved {batch_count} saved tracks in batch (offset {offset}), total: {total_fetched}")
|
|
|
|
# Check if we've fetched all saved tracks
|
|
if len(results['items']) < limit or not results.get('next'):
|
|
break
|
|
|
|
offset += limit
|
|
|
|
logger.info(f"Retrieved {len(tracks)} total saved tracks")
|
|
return tracks
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching saved tracks: {e}")
|
|
return []
|
|
|
|
@rate_limited
|
|
def _get_playlist_tracks(self, playlist_id: str) -> List[Track]:
|
|
if not self.is_spotify_authenticated():
|
|
return []
|
|
|
|
tracks = []
|
|
|
|
try:
|
|
results = self.sp.playlist_items(playlist_id, limit=100)
|
|
|
|
while results:
|
|
for item in results['items']:
|
|
# Handle both old API ('track') and new Feb 2026 API ('item') field names
|
|
track_data = item.get('track') or item.get('item')
|
|
if track_data and track_data.get('id'):
|
|
track = Track.from_spotify_track(track_data)
|
|
tracks.append(track)
|
|
|
|
results = self.sp.next(results) if results['next'] else None
|
|
|
|
return tracks
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching playlist tracks: {e}")
|
|
return []
|
|
|
|
@rate_limited
|
|
def get_playlist_by_id(self, playlist_id: str) -> Optional[Playlist]:
|
|
if not self.is_spotify_authenticated():
|
|
return None
|
|
|
|
try:
|
|
playlist_data = self.sp.playlist(playlist_id)
|
|
tracks = self._get_playlist_tracks(playlist_id)
|
|
return Playlist.from_spotify_playlist(playlist_data, tracks)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching playlist {playlist_id}: {e}")
|
|
return None
|
|
|
|
@rate_limited
|
|
def search_tracks(self, query: str, limit: int = 10) -> List[Track]:
|
|
"""Search for tracks - falls back to iTunes if Spotify not authenticated"""
|
|
if self.is_spotify_authenticated():
|
|
try:
|
|
results = self.sp.search(q=query, type='track', limit=min(limit, 10))
|
|
tracks = []
|
|
|
|
for track_data in results['tracks']['items']:
|
|
track = Track.from_spotify_track(track_data)
|
|
tracks.append(track)
|
|
|
|
return tracks
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error searching tracks via Spotify: {e}")
|
|
# Fall through to iTunes fallback
|
|
|
|
# iTunes fallback
|
|
logger.debug(f"Using iTunes fallback for track search: {query}")
|
|
return self._itunes.search_tracks(query, limit)
|
|
|
|
@rate_limited
|
|
def search_artists(self, query: str, limit: int = 10) -> List[Artist]:
|
|
"""Search for artists - falls back to iTunes if Spotify not authenticated"""
|
|
if self.is_spotify_authenticated():
|
|
try:
|
|
results = self.sp.search(q=query, type='artist', limit=min(limit, 10))
|
|
artists = []
|
|
|
|
for artist_data in results['artists']['items']:
|
|
artist = Artist.from_spotify_artist(artist_data)
|
|
artists.append(artist)
|
|
|
|
return artists
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error searching artists via Spotify: {e}")
|
|
# Fall through to iTunes fallback
|
|
|
|
# iTunes fallback
|
|
logger.debug(f"Using iTunes fallback for artist search: {query}")
|
|
return self._itunes.search_artists(query, limit)
|
|
|
|
@rate_limited
|
|
def search_albums(self, query: str, limit: int = 10) -> List[Album]:
|
|
"""Search for albums - falls back to iTunes if Spotify not authenticated"""
|
|
if self.is_spotify_authenticated():
|
|
try:
|
|
results = self.sp.search(q=query, type='album', limit=min(limit, 10))
|
|
albums = []
|
|
|
|
for album_data in results['albums']['items']:
|
|
album = Album.from_spotify_album(album_data)
|
|
albums.append(album)
|
|
|
|
return albums
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error searching albums via Spotify: {e}")
|
|
# Fall through to iTunes fallback
|
|
|
|
# iTunes fallback
|
|
logger.debug(f"Using iTunes fallback for album search: {query}")
|
|
return self._itunes.search_albums(query, limit)
|
|
|
|
@rate_limited
|
|
def get_track_details(self, track_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Get detailed track information - falls back to iTunes if Spotify not authenticated"""
|
|
if self.is_spotify_authenticated():
|
|
try:
|
|
track_data = self.sp.track(track_id)
|
|
|
|
# Enhance with additional useful metadata for our purposes
|
|
if track_data:
|
|
enhanced_data = {
|
|
'id': track_data['id'],
|
|
'name': track_data['name'],
|
|
'track_number': track_data['track_number'],
|
|
'disc_number': track_data['disc_number'],
|
|
'duration_ms': track_data['duration_ms'],
|
|
'explicit': track_data['explicit'],
|
|
'artists': [artist['name'] for artist in track_data['artists']],
|
|
'primary_artist': track_data['artists'][0]['name'] if track_data['artists'] else None,
|
|
'album': {
|
|
'id': track_data['album']['id'],
|
|
'name': track_data['album']['name'],
|
|
'total_tracks': track_data['album']['total_tracks'],
|
|
'release_date': track_data['album']['release_date'],
|
|
'album_type': track_data['album']['album_type'],
|
|
'artists': [artist['name'] for artist in track_data['album']['artists']]
|
|
},
|
|
'is_album_track': track_data['album']['total_tracks'] > 1,
|
|
'raw_data': track_data # Keep original for fallback
|
|
}
|
|
return enhanced_data
|
|
return track_data
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching track details via Spotify: {e}")
|
|
# Fall through to iTunes fallback
|
|
|
|
# iTunes fallback - only if ID is numeric (iTunes format)
|
|
if self._is_itunes_id(track_id):
|
|
logger.debug(f"Using iTunes fallback for track details: {track_id}")
|
|
return self._itunes.get_track_details(track_id)
|
|
else:
|
|
logger.debug(f"Cannot use iTunes fallback for Spotify track ID: {track_id}")
|
|
return None
|
|
|
|
@rate_limited
|
|
def get_track_features(self, track_id: str) -> Optional[Dict[str, Any]]:
|
|
if not self.is_spotify_authenticated():
|
|
return None
|
|
|
|
try:
|
|
features = self.sp.audio_features(track_id)
|
|
return features[0] if features else None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching track features: {e}")
|
|
return None
|
|
|
|
@rate_limited
|
|
def get_album(self, album_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Get album information - falls back to iTunes if Spotify not authenticated"""
|
|
if self.is_spotify_authenticated():
|
|
try:
|
|
album_data = self.sp.album(album_id)
|
|
return album_data
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching album via Spotify: {e}")
|
|
# Fall through to iTunes fallback
|
|
|
|
# iTunes fallback - only if ID is numeric (iTunes format)
|
|
if self._is_itunes_id(album_id):
|
|
logger.debug(f"Using iTunes fallback for album: {album_id}")
|
|
return self._itunes.get_album(album_id)
|
|
else:
|
|
logger.debug(f"Cannot use iTunes fallback for Spotify album ID: {album_id}")
|
|
return None
|
|
|
|
@rate_limited
|
|
def get_album_tracks(self, album_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Get album tracks - falls back to iTunes if Spotify not authenticated"""
|
|
if self.is_spotify_authenticated():
|
|
try:
|
|
# Get first page of tracks
|
|
first_page = self.sp.album_tracks(album_id)
|
|
if not first_page or 'items' not in first_page:
|
|
return None
|
|
|
|
# Collect all tracks starting with first page
|
|
all_tracks = first_page['items'][:]
|
|
|
|
# Fetch remaining pages if they exist
|
|
next_page = first_page
|
|
while next_page.get('next'):
|
|
next_page = self.sp.next(next_page)
|
|
if next_page and 'items' in next_page:
|
|
all_tracks.extend(next_page['items'])
|
|
|
|
# Log success
|
|
logger.info(f"Retrieved {len(all_tracks)} tracks for album {album_id}")
|
|
|
|
# Return structure with all tracks
|
|
result = first_page.copy()
|
|
result['items'] = all_tracks
|
|
result['next'] = None # No more pages
|
|
result['limit'] = len(all_tracks) # Update to reflect all tracks fetched
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching album tracks via Spotify: {e}")
|
|
# Fall through to iTunes fallback
|
|
|
|
# iTunes fallback - only if ID is numeric (iTunes format)
|
|
if self._is_itunes_id(album_id):
|
|
logger.debug(f"Using iTunes fallback for album tracks: {album_id}")
|
|
return self._itunes.get_album_tracks(album_id)
|
|
else:
|
|
logger.debug(f"Cannot use iTunes fallback for Spotify album ID: {album_id}")
|
|
return None
|
|
|
|
@rate_limited
|
|
def get_artist_albums(self, artist_id: str, album_type: str = 'album,single', limit: int = 10) -> List[Album]:
|
|
"""Get albums by artist ID - falls back to iTunes if Spotify not authenticated"""
|
|
if self.is_spotify_authenticated():
|
|
try:
|
|
albums = []
|
|
results = self.sp.artist_albums(artist_id, album_type=album_type, limit=min(limit, 10))
|
|
|
|
while results:
|
|
for album_data in results['items']:
|
|
album = Album.from_spotify_album(album_data)
|
|
albums.append(album)
|
|
|
|
# Get next batch if available
|
|
results = self.sp.next(results) if results['next'] else None
|
|
|
|
logger.info(f"Retrieved {len(albums)} albums for artist {artist_id}")
|
|
return albums
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching artist albums via Spotify: {e}")
|
|
# Fall through to iTunes fallback
|
|
|
|
# iTunes fallback - only if ID is numeric (iTunes format)
|
|
if self._is_itunes_id(artist_id):
|
|
logger.debug(f"Using iTunes fallback for artist albums: {artist_id}")
|
|
return self._itunes.get_artist_albums(artist_id, album_type, limit)
|
|
else:
|
|
logger.debug(f"Cannot use iTunes fallback for Spotify artist ID: {artist_id}")
|
|
return []
|
|
|
|
@rate_limited
|
|
def get_user_info(self) -> Optional[Dict[str, Any]]:
|
|
if not self.is_spotify_authenticated():
|
|
return None
|
|
|
|
try:
|
|
return self.sp.current_user()
|
|
except Exception as e:
|
|
logger.error(f"Error fetching user info: {e}")
|
|
return None
|
|
|
|
@rate_limited
|
|
def get_artist(self, artist_id: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Get full artist details - falls back to iTunes if Spotify not authenticated.
|
|
|
|
Args:
|
|
artist_id: Artist ID (Spotify or iTunes depending on authentication)
|
|
|
|
Returns:
|
|
Dictionary with artist data including images, genres, popularity
|
|
"""
|
|
if self.is_spotify_authenticated():
|
|
try:
|
|
return self.sp.artist(artist_id)
|
|
except Exception as e:
|
|
logger.error(f"Error fetching artist via Spotify: {e}")
|
|
# Fall through to iTunes fallback
|
|
|
|
# iTunes fallback - only if ID is numeric (iTunes format)
|
|
if self._is_itunes_id(artist_id):
|
|
logger.debug(f"Using iTunes fallback for artist: {artist_id}")
|
|
return self._itunes.get_artist(artist_id)
|
|
else:
|
|
logger.debug(f"Cannot use iTunes fallback for Spotify artist ID: {artist_id}")
|
|
return None |