|
|
import os
|
|
|
import requests
|
|
|
import time
|
|
|
import re
|
|
|
import threading
|
|
|
from typing import Dict, List, Optional, Any
|
|
|
from functools import wraps
|
|
|
from dataclasses import dataclass
|
|
|
from utils.logging_config import get_logger
|
|
|
from config.settings import config_manager
|
|
|
import json
|
|
|
import base64
|
|
|
import webbrowser
|
|
|
import urllib.parse
|
|
|
from http.server import HTTPServer, BaseHTTPRequestHandler
|
|
|
import socketserver
|
|
|
import hashlib
|
|
|
import secrets
|
|
|
|
|
|
logger = get_logger("tidal_client")
|
|
|
|
|
|
# Virtual playlist identity for the user's Favorite Tracks (Tidal's
|
|
|
# "My Collection" view). Treated like a normal playlist by every
|
|
|
# get_playlist consumer (mirror auto-refresh, discovery, sync UI) —
|
|
|
# the client recognizes the ID and dispatches to the dedicated
|
|
|
# `userCollectionTracks` endpoint internally. ID intentionally has no
|
|
|
# colon: the sync-services.js renderer interpolates IDs into CSS
|
|
|
# selectors via template literals (e.g. `#tidal-card-${p.id} .foo`)
|
|
|
# and a `:` in the ID would be parsed as a CSS pseudo-class operator.
|
|
|
COLLECTION_PLAYLIST_ID = "tidal-favorites"
|
|
|
COLLECTION_PLAYLIST_NAME = "Favorite Tracks"
|
|
|
COLLECTION_PLAYLIST_DESCRIPTION = "Your favorited tracks on Tidal"
|
|
|
|
|
|
# Global rate limiting variables
|
|
|
_last_api_call_time = 0
|
|
|
_api_call_lock = threading.Lock()
|
|
|
MIN_API_INTERVAL = 0.5 # 500ms between API calls
|
|
|
|
|
|
def rate_limited(func):
|
|
|
"""Decorator to enforce rate limiting on Tidal API calls with retry logic"""
|
|
|
@wraps(func)
|
|
|
def wrapper(*args, **kwargs):
|
|
|
max_retries = 4
|
|
|
last_exception = None
|
|
|
|
|
|
for attempt in range(max_retries):
|
|
|
global _last_api_call_time
|
|
|
|
|
|
with _api_call_lock:
|
|
|
current_time = time.time()
|
|
|
time_since_last_call = current_time - _last_api_call_time
|
|
|
|
|
|
if time_since_last_call < MIN_API_INTERVAL:
|
|
|
sleep_time = MIN_API_INTERVAL - time_since_last_call
|
|
|
time.sleep(sleep_time)
|
|
|
|
|
|
_last_api_call_time = time.time()
|
|
|
|
|
|
from core.api_call_tracker import api_call_tracker
|
|
|
api_call_tracker.record_call('tidal')
|
|
|
|
|
|
try:
|
|
|
result = func(*args, **kwargs)
|
|
|
return result
|
|
|
except Exception as e:
|
|
|
last_exception = e
|
|
|
error_str = str(e)
|
|
|
|
|
|
# Only retry on specific errors
|
|
|
if "rate limit" in error_str.lower() or "429" in error_str:
|
|
|
backoff = 3.0 * (2 ** attempt) # Exponential: 3s, 6s, 12s, 24s
|
|
|
logger.warning(f"Rate limit hit on attempt {attempt + 1}/{max_retries}, backing off {backoff}s: {e}")
|
|
|
if attempt < max_retries - 1:
|
|
|
time.sleep(backoff)
|
|
|
continue
|
|
|
elif "503" in error_str or "502" in error_str:
|
|
|
logger.warning(f"Tidal service error on attempt {attempt + 1}/{max_retries}, backing off: {e}")
|
|
|
if attempt < max_retries - 1:
|
|
|
time.sleep(2.0)
|
|
|
continue
|
|
|
|
|
|
# For other errors, don't retry
|
|
|
raise
|
|
|
|
|
|
# If we exhausted retries, raise the last exception
|
|
|
raise last_exception
|
|
|
return wrapper
|
|
|
|
|
|
@dataclass
|
|
|
class Track:
|
|
|
"""Tidal track data structure compatible with existing Track objects"""
|
|
|
id: str
|
|
|
name: str
|
|
|
artists: List[str]
|
|
|
album: str = ""
|
|
|
duration_ms: int = 0
|
|
|
external_urls: Dict[str, str] = None
|
|
|
popularity: int = 0
|
|
|
explicit: bool = False
|
|
|
|
|
|
def __post_init__(self):
|
|
|
if self.external_urls is None:
|
|
|
self.external_urls = {}
|
|
|
|
|
|
@dataclass
|
|
|
class Playlist:
|
|
|
"""Tidal playlist data structure compatible with existing Playlist objects"""
|
|
|
id: str
|
|
|
name: str
|
|
|
description: str = ""
|
|
|
tracks: List[Track] = None
|
|
|
external_urls: Dict[str, str] = None
|
|
|
owner: Optional[Dict[str, Any]] = None
|
|
|
public: bool = True
|
|
|
|
|
|
def __post_init__(self):
|
|
|
if self.tracks is None:
|
|
|
self.tracks = []
|
|
|
if self.external_urls is None:
|
|
|
self.external_urls = {}
|
|
|
|
|
|
class TidalClient:
|
|
|
"""Tidal API client for fetching user playlists and track data"""
|
|
|
|
|
|
def __init__(self):
|
|
|
self.client_id = None
|
|
|
self.client_secret = None
|
|
|
self.access_token = None
|
|
|
self.refresh_token = None
|
|
|
self.token_expires_at = 0
|
|
|
self.base_url = "https://openapi.tidal.com/v2"
|
|
|
self.alt_base_url = "https://api.tidal.com/v1" # Alternative API base
|
|
|
self.auth_url = "https://login.tidal.com/authorize"
|
|
|
self.token_url = "https://auth.tidal.com/v1/oauth2/token"
|
|
|
_tidal_port = int(os.environ.get('SOULSYNC_TIDAL_CALLBACK_PORT', 8889))
|
|
|
self.redirect_uri = f"http://127.0.0.1:{_tidal_port}/tidal/callback" # Default, will be updated from config
|
|
|
self.session = requests.Session()
|
|
|
self.auth_server = None
|
|
|
self.auth_code = None
|
|
|
self.code_verifier = None
|
|
|
self.code_challenge = None
|
|
|
|
|
|
self._load_config()
|
|
|
self._setup_session()
|
|
|
|
|
|
# Try to load saved tokens
|
|
|
self._load_saved_tokens()
|
|
|
|
|
|
def _load_config(self):
|
|
|
"""Load Tidal configuration from settings"""
|
|
|
try:
|
|
|
tidal_config = config_manager.get('tidal', {})
|
|
|
self.client_id = tidal_config.get('client_id')
|
|
|
self.client_secret = tidal_config.get('client_secret')
|
|
|
self.redirect_uri = tidal_config.get('redirect_uri', self.redirect_uri) # Use config or default
|
|
|
|
|
|
if not self.client_id or not self.client_secret:
|
|
|
logger.warning("Tidal client ID or secret not configured")
|
|
|
return False
|
|
|
|
|
|
logger.info(f"Loaded Tidal config with client ID: {self.client_id[:8]}...")
|
|
|
return True
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to load Tidal configuration: {e}")
|
|
|
return False
|
|
|
|
|
|
def _setup_session(self):
|
|
|
"""Setup requests session with headers"""
|
|
|
self.session.headers.update({
|
|
|
'Accept': 'application/vnd.api+json',
|
|
|
'User-Agent': 'SoulSync/1.0'
|
|
|
})
|
|
|
|
|
|
def _load_saved_tokens(self):
|
|
|
"""Load saved tokens from config"""
|
|
|
try:
|
|
|
tidal_tokens = config_manager.get('tidal_tokens', {})
|
|
|
self.access_token = tidal_tokens.get('access_token')
|
|
|
self.refresh_token = tidal_tokens.get('refresh_token')
|
|
|
self.token_expires_at = tidal_tokens.get('expires_at', 0)
|
|
|
|
|
|
if self.access_token:
|
|
|
self.session.headers['Authorization'] = f'Bearer {self.access_token}'
|
|
|
logger.info("Loaded saved Tidal tokens")
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error loading saved Tidal tokens: {e}")
|
|
|
|
|
|
def _save_tokens(self):
|
|
|
"""Save tokens to config"""
|
|
|
try:
|
|
|
tidal_tokens = {
|
|
|
'access_token': self.access_token,
|
|
|
'refresh_token': self.refresh_token,
|
|
|
'expires_at': self.token_expires_at
|
|
|
}
|
|
|
config_manager.set('tidal_tokens', tidal_tokens)
|
|
|
logger.info("Saved Tidal tokens")
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error saving Tidal tokens: {e}")
|
|
|
|
|
|
def _parse_json_api_track(self, track_data: Dict[str, Any], artist_details_map: Dict[str, Any] = None) -> Optional[Track]:
|
|
|
"""Parse a track from a JSON:API 'included' object with artist details."""
|
|
|
try:
|
|
|
track_id = track_data.get('id')
|
|
|
if not track_id:
|
|
|
return None
|
|
|
|
|
|
attributes = track_data.get('attributes', {})
|
|
|
|
|
|
# Parse artists from relationships and artist details map
|
|
|
artists = []
|
|
|
if artist_details_map:
|
|
|
relationships = track_data.get('relationships', {})
|
|
|
artist_relationships = relationships.get('artists', {}).get('data', [])
|
|
|
|
|
|
for artist_ref in artist_relationships:
|
|
|
artist_id = artist_ref.get('id')
|
|
|
if artist_id and artist_id in artist_details_map:
|
|
|
artist_data = artist_details_map[artist_id]
|
|
|
artist_attributes = artist_data.get('attributes', {})
|
|
|
artist_name = artist_attributes.get('name', 'Unknown Artist')
|
|
|
artists.append(artist_name)
|
|
|
|
|
|
# Fallback if no artists found
|
|
|
if not artists:
|
|
|
artists = ['Unknown Artist']
|
|
|
|
|
|
# Append version info (e.g. "Bloom remix") to title if present
|
|
|
track_title = attributes.get('title', 'Unknown Track')
|
|
|
track_version = attributes.get('version') or ''
|
|
|
if track_version and track_version.lower() not in track_title.lower():
|
|
|
track_title = f"{track_title} ({track_version})"
|
|
|
|
|
|
return Track(
|
|
|
id=str(track_id),
|
|
|
name=track_title,
|
|
|
artists=artists,
|
|
|
duration_ms=attributes.get('duration', 0) * 1000 if attributes.get('duration') else 0, # Convert to ms
|
|
|
external_urls={'tidal': f"https://tidal.com/browse/track/{track_id}"},
|
|
|
explicit=attributes.get('explicit', False)
|
|
|
)
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error parsing JSON:API track data: {e}")
|
|
|
return None
|
|
|
|
|
|
|
|
|
def _generate_pkce_challenge(self):
|
|
|
"""Generate PKCE code verifier and challenge"""
|
|
|
# Generate a random code verifier (43-128 characters)
|
|
|
self.code_verifier = base64.urlsafe_b64encode(secrets.token_bytes(32)).decode('utf-8').rstrip('=')
|
|
|
|
|
|
# Create code challenge (SHA256 hash of verifier, base64 URL-encoded)
|
|
|
challenge_bytes = hashlib.sha256(self.code_verifier.encode('utf-8')).digest()
|
|
|
self.code_challenge = base64.urlsafe_b64encode(challenge_bytes).decode('utf-8').rstrip('=')
|
|
|
|
|
|
logger.info(f"Generated PKCE verifier: {self.code_verifier[:10]}...")
|
|
|
logger.info(f"Generated PKCE challenge: {self.code_challenge[:10]}...")
|
|
|
|
|
|
def authenticate(self):
|
|
|
"""Start OAuth authentication flow"""
|
|
|
try:
|
|
|
if not self.client_id:
|
|
|
logger.error("Tidal client ID not configured")
|
|
|
return False
|
|
|
|
|
|
# Generate PKCE challenge
|
|
|
self._generate_pkce_challenge()
|
|
|
|
|
|
# Create OAuth URL with PKCE.
|
|
|
# `prompt=consent` forces Tidal to display the consent
|
|
|
# screen even when the app is already authorized — without
|
|
|
# it, re-authenticating with newly-added scopes (e.g.
|
|
|
# `collection.read` added in v2.5.0) can silently return a
|
|
|
# token carrying only the ORIGINAL scope set because Tidal
|
|
|
# treats the existing authorization as still valid.
|
|
|
params = {
|
|
|
'response_type': 'code',
|
|
|
'client_id': self.client_id,
|
|
|
'redirect_uri': self.redirect_uri,
|
|
|
'scope': 'user.read playlists.read collection.read', # collection.read needed for userCollectionTracks endpoint
|
|
|
'code_challenge': self.code_challenge,
|
|
|
'code_challenge_method': 'S256',
|
|
|
'prompt': 'consent',
|
|
|
}
|
|
|
|
|
|
auth_url = f"{self.auth_url}?" + urllib.parse.urlencode(params)
|
|
|
|
|
|
logger.info("Starting Tidal OAuth flow...")
|
|
|
logger.info(f"OAuth URL: {auth_url}")
|
|
|
logger.info(f"Redirect URI: {self.redirect_uri}")
|
|
|
|
|
|
# Start callback server
|
|
|
self._start_callback_server()
|
|
|
|
|
|
# Open browser
|
|
|
webbrowser.open(auth_url)
|
|
|
|
|
|
# Wait for callback (with timeout)
|
|
|
timeout = 120 # 2 minutes
|
|
|
start_time = time.time()
|
|
|
|
|
|
while not self.auth_code and time.time() - start_time < timeout:
|
|
|
time.sleep(0.1)
|
|
|
|
|
|
# Stop server
|
|
|
if self.auth_server:
|
|
|
self.auth_server.shutdown()
|
|
|
self.auth_server = None
|
|
|
|
|
|
if not self.auth_code:
|
|
|
logger.error("Tidal OAuth timeout - no authorization code received")
|
|
|
return False
|
|
|
|
|
|
# Exchange code for tokens
|
|
|
return self._exchange_code_for_tokens()
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error in Tidal OAuth flow: {e}")
|
|
|
return False
|
|
|
|
|
|
def _start_callback_server(self):
|
|
|
"""Start HTTP server to receive OAuth callback"""
|
|
|
# Skip starting server in Docker/production mode - web server handles callbacks
|
|
|
import os
|
|
|
if os.getenv('FLASK_ENV') == 'production' or os.path.exists('/.dockerenv'):
|
|
|
logger.info("Docker/WebUI mode detected - skipping TidalClient callback server (web server handles callbacks)")
|
|
|
return
|
|
|
|
|
|
# Store reference to self for the callback handler
|
|
|
tidal_client_ref = self
|
|
|
|
|
|
class CallbackHandler(BaseHTTPRequestHandler):
|
|
|
def do_GET(handler_self):
|
|
|
parsed_url = urllib.parse.urlparse(handler_self.path)
|
|
|
query_params = urllib.parse.parse_qs(parsed_url.query)
|
|
|
|
|
|
# Debug: Log the full callback URL and parameters
|
|
|
logger.info(f"Tidal callback received: {handler_self.path}")
|
|
|
logger.info(f"Query parameters: {query_params}")
|
|
|
|
|
|
if 'code' in query_params:
|
|
|
tidal_client_ref.auth_code = query_params['code'][0]
|
|
|
logger.info(f"Received Tidal authorization code: {tidal_client_ref.auth_code[:10]}...")
|
|
|
|
|
|
# Send success response
|
|
|
handler_self.send_response(200)
|
|
|
handler_self.send_header('Content-type', 'text/html')
|
|
|
handler_self.end_headers()
|
|
|
handler_self.wfile.write(b'<h1>Success!</h1><p>You can close this window and return to SoulSync.</p>')
|
|
|
elif 'error' in query_params:
|
|
|
# Handle OAuth errors
|
|
|
error = query_params.get('error', ['unknown'])[0]
|
|
|
error_description = query_params.get('error_description', ['No description'])[0]
|
|
|
logger.error(f"Tidal OAuth error: {error} - {error_description}")
|
|
|
|
|
|
handler_self.send_response(400)
|
|
|
handler_self.send_header('Content-type', 'text/html')
|
|
|
handler_self.end_headers()
|
|
|
handler_self.wfile.write(f'<h1>OAuth Error</h1><p>Error: {error}</p><p>Description: {error_description}</p>'.encode())
|
|
|
else:
|
|
|
logger.error("No authorization code or error in Tidal callback")
|
|
|
handler_self.send_response(400)
|
|
|
handler_self.send_header('Content-type', 'text/html')
|
|
|
handler_self.end_headers()
|
|
|
handler_self.wfile.write(b'<h1>Error</h1><p>Authorization failed - no code received.</p>')
|
|
|
|
|
|
def log_message(handler_self, format, *args):
|
|
|
pass # Suppress server logs
|
|
|
|
|
|
try:
|
|
|
port = int(os.environ.get('SOULSYNC_TIDAL_CALLBACK_PORT', 8889))
|
|
|
self.auth_server = HTTPServer(('localhost', port), CallbackHandler)
|
|
|
server_thread = threading.Thread(target=self.auth_server.serve_forever)
|
|
|
server_thread.daemon = True
|
|
|
server_thread.start()
|
|
|
logger.info(f"Started Tidal callback server on port {port}")
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to start Tidal callback server: {e}")
|
|
|
|
|
|
@rate_limited
|
|
|
def _exchange_code_for_tokens(self):
|
|
|
"""Exchange authorization code for access tokens"""
|
|
|
try:
|
|
|
data = {
|
|
|
'grant_type': 'authorization_code',
|
|
|
'code': self.auth_code,
|
|
|
'redirect_uri': self.redirect_uri,
|
|
|
'client_id': self.client_id,
|
|
|
'client_secret': self.client_secret,
|
|
|
'code_verifier': self.code_verifier
|
|
|
}
|
|
|
|
|
|
response = self.session.post(
|
|
|
self.token_url,
|
|
|
data=data,
|
|
|
headers={
|
|
|
'Content-Type': 'application/x-www-form-urlencoded',
|
|
|
'Accept': 'application/json',
|
|
|
},
|
|
|
timeout=10
|
|
|
)
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
token_data = response.json()
|
|
|
self.access_token = token_data.get('access_token')
|
|
|
self.refresh_token = token_data.get('refresh_token')
|
|
|
expires_in = token_data.get('expires_in', 3600)
|
|
|
self.token_expires_at = time.time() + expires_in - 60
|
|
|
|
|
|
# Update session headers
|
|
|
self.session.headers['Authorization'] = f'Bearer {self.access_token}'
|
|
|
|
|
|
# Save tokens
|
|
|
self._save_tokens()
|
|
|
|
|
|
logger.info("Successfully exchanged Tidal code for tokens")
|
|
|
return True
|
|
|
else:
|
|
|
logger.error(f"Failed to exchange Tidal code: {response.status_code} - {response.text}")
|
|
|
return False
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error exchanging Tidal code for tokens: {e}")
|
|
|
return False
|
|
|
|
|
|
@rate_limited
|
|
|
def _refresh_access_token(self):
|
|
|
"""Refresh the access token using refresh token"""
|
|
|
try:
|
|
|
if not self.refresh_token:
|
|
|
logger.error("No Tidal refresh token available")
|
|
|
return False
|
|
|
|
|
|
if not self.client_id or not self.client_secret:
|
|
|
logger.debug("Tidal client_id/secret not configured — skipping token refresh")
|
|
|
# Clear stale tokens so we stop retrying
|
|
|
self.access_token = None
|
|
|
self.refresh_token = None
|
|
|
self.token_expires_at = 0
|
|
|
return False
|
|
|
|
|
|
data = {
|
|
|
'grant_type': 'refresh_token',
|
|
|
'refresh_token': self.refresh_token,
|
|
|
'client_id': self.client_id,
|
|
|
'client_secret': self.client_secret
|
|
|
}
|
|
|
|
|
|
response = self.session.post(
|
|
|
self.token_url,
|
|
|
data=data,
|
|
|
headers={
|
|
|
'Content-Type': 'application/x-www-form-urlencoded',
|
|
|
'Accept': 'application/json',
|
|
|
},
|
|
|
timeout=10
|
|
|
)
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
token_data = response.json()
|
|
|
self.access_token = token_data.get('access_token')
|
|
|
expires_in = token_data.get('expires_in', 3600)
|
|
|
self.token_expires_at = time.time() + expires_in - 60
|
|
|
|
|
|
# Update refresh token if provided
|
|
|
if 'refresh_token' in token_data:
|
|
|
self.refresh_token = token_data['refresh_token']
|
|
|
|
|
|
# Update session headers
|
|
|
self.session.headers['Authorization'] = f'Bearer {self.access_token}'
|
|
|
|
|
|
# Save tokens
|
|
|
self._save_tokens()
|
|
|
|
|
|
logger.info("Successfully refreshed Tidal access token")
|
|
|
return True
|
|
|
else:
|
|
|
logger.error(f"Failed to refresh Tidal token: {response.status_code} - {response.text}")
|
|
|
return False
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error refreshing Tidal token: {e}")
|
|
|
return False
|
|
|
|
|
|
def fetch_token_from_code(self, auth_code: str) -> bool:
|
|
|
"""Exchange authorization code for access tokens (for web server callback)"""
|
|
|
try:
|
|
|
logger.info(f"Starting token exchange with code: {auth_code[:20]}...")
|
|
|
logger.info(f"Using code_verifier: {self.code_verifier[:20] if self.code_verifier else 'None'}...")
|
|
|
logger.info(f"Using redirect_uri: {self.redirect_uri}")
|
|
|
|
|
|
self.auth_code = auth_code
|
|
|
result = self._exchange_code_for_tokens()
|
|
|
|
|
|
if result:
|
|
|
logger.info("Token exchange successful")
|
|
|
else:
|
|
|
logger.error("Token exchange failed")
|
|
|
|
|
|
return result
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error in fetch_token_from_code: {e}")
|
|
|
import traceback
|
|
|
logger.error(f"Full traceback: {traceback.format_exc()}")
|
|
|
return False
|
|
|
|
|
|
def _ensure_valid_token(self):
|
|
|
"""Ensure we have a valid access token"""
|
|
|
if not self.access_token:
|
|
|
logger.info("No Tidal access token - need to authenticate")
|
|
|
return self.authenticate()
|
|
|
|
|
|
if time.time() >= self.token_expires_at:
|
|
|
logger.info("Tidal access token expired - refreshing...")
|
|
|
if self.refresh_token:
|
|
|
return self._refresh_access_token()
|
|
|
else:
|
|
|
logger.info("No refresh token - need to re-authenticate")
|
|
|
return self.authenticate()
|
|
|
|
|
|
return True
|
|
|
|
|
|
def is_authenticated(self):
|
|
|
"""Check if client is authenticated, refreshing expired tokens if possible"""
|
|
|
if self.access_token and time.time() < self.token_expires_at:
|
|
|
return True
|
|
|
|
|
|
# Backoff: if refresh recently failed, don't retry for 5 minutes
|
|
|
if hasattr(self, '_refresh_failed_at') and self._refresh_failed_at:
|
|
|
if time.time() - self._refresh_failed_at < 300:
|
|
|
return False
|
|
|
|
|
|
# Token expired but refresh token available — try silent refresh
|
|
|
if self.access_token and self.refresh_token:
|
|
|
logger.info("Tidal access token expired — attempting silent refresh...")
|
|
|
result = self._refresh_access_token()
|
|
|
if not result:
|
|
|
self._refresh_failed_at = time.time()
|
|
|
return result
|
|
|
|
|
|
return False
|
|
|
|
|
|
def disconnect(self):
|
|
|
"""Clear all saved Tidal auth state so the next OAuth flow
|
|
|
starts fresh with the current scope set.
|
|
|
|
|
|
Used when a previously-authorized token doesn't carry a newly-
|
|
|
added scope (e.g. `collection.read`): even with `prompt=consent`
|
|
|
on the auth URL, some users hit a Tidal flow that rebinds the
|
|
|
existing grant. Disconnect first → re-authenticate forces a
|
|
|
clean slate."""
|
|
|
self.access_token = None
|
|
|
self.refresh_token = None
|
|
|
self.token_expires_at = 0
|
|
|
self._collection_needs_reconnect = False
|
|
|
self.session.headers.pop('Authorization', None)
|
|
|
try:
|
|
|
config_manager.set('tidal_tokens', {})
|
|
|
except Exception as e:
|
|
|
logger.warning(f"Failed to clear tidal_tokens config: {e}")
|
|
|
logger.info("Tidal client disconnected — saved tokens cleared")
|
|
|
|
|
|
def _get_user_id(self):
|
|
|
"""Get current user's ID from /users/me endpoint"""
|
|
|
try:
|
|
|
endpoints_to_try = [
|
|
|
# V2 API (Prioritize this as it matches your documentation)
|
|
|
(f"{self.base_url}/users/me", "v2"),
|
|
|
(f"{self.base_url}/me", "v2 alt"),
|
|
|
# V1 API
|
|
|
(f"{self.alt_base_url}/users/me", "v1")
|
|
|
]
|
|
|
|
|
|
for endpoint, version in endpoints_to_try:
|
|
|
try:
|
|
|
logger.info(f"Trying to get user ID from {version}: {endpoint}")
|
|
|
|
|
|
if version == "v1":
|
|
|
headers = {
|
|
|
'Accept': 'application/json',
|
|
|
'Authorization': f'Bearer {self.access_token}',
|
|
|
'User-Agent': 'TIDAL_ANDROID/2.47.1 okhttp/4.9.0'
|
|
|
}
|
|
|
params = {'countryCode': 'US'}
|
|
|
else:
|
|
|
# For v2, use the standard session headers
|
|
|
headers = self.session.headers.copy()
|
|
|
# The v2 endpoint also requires the correct 'accept' header
|
|
|
headers['accept'] = 'application/vnd.api+json'
|
|
|
params = {}
|
|
|
|
|
|
response = requests.get(endpoint, headers=headers, params=params, timeout=10)
|
|
|
logger.info(f"User ID response: {response.status_code}")
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
data = response.json()
|
|
|
logger.info(f"User data keys: {list(data.keys()) if isinstance(data, dict) else 'Not a dict'}")
|
|
|
|
|
|
# --- START OF CORRECTION ---
|
|
|
# Correctly parse the nested JSON structure from your example
|
|
|
user_id = None
|
|
|
if 'data' in data and isinstance(data['data'], dict):
|
|
|
user_id = data['data'].get('id')
|
|
|
|
|
|
# Fallback to the original checks for other API responses
|
|
|
if not user_id:
|
|
|
user_id = data.get('id') or data.get('userId') or data.get('uid') or data.get('user_id')
|
|
|
# --- END OF CORRECTION ---
|
|
|
|
|
|
if user_id:
|
|
|
logger.info(f"Found user ID: {user_id}")
|
|
|
return str(user_id), version
|
|
|
else:
|
|
|
logger.warning(f"No user ID found in response: {data}")
|
|
|
else:
|
|
|
logger.warning(f"Failed to get user ID: {response.status_code} - {response.text[:200]}")
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.warning(f"Error getting user ID from {version}: {e}")
|
|
|
continue
|
|
|
|
|
|
return None, None
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error in _get_user_id: {e}")
|
|
|
return None, None
|
|
|
|
|
|
@rate_limited
|
|
|
def get_user_playlists_metadata_only(self):
|
|
|
"""Get user's playlists using the V2 filtered endpoint."""
|
|
|
try:
|
|
|
if not self._ensure_valid_token():
|
|
|
logger.error("Not authenticated with Tidal")
|
|
|
return []
|
|
|
|
|
|
# Step 1: Get the user ID, which is needed for the filter.
|
|
|
user_id, _ = self._get_user_id()
|
|
|
if not user_id:
|
|
|
logger.error("Could not retrieve Tidal User ID to fetch playlists.")
|
|
|
return []
|
|
|
|
|
|
logger.info(f"Using V2 endpoint to fetch playlists for user ID: {user_id}")
|
|
|
|
|
|
# Step 2: Construct the correct V2 endpoint and parameters.
|
|
|
# NOTE: We don't include 'items' here because the V2 API only includes ~20 tracks
|
|
|
# We'll fetch full track lists separately for each playlist
|
|
|
endpoint = f"{self.base_url}/playlists"
|
|
|
params = {
|
|
|
'countryCode': 'US',
|
|
|
'filter[owners.id]': user_id
|
|
|
}
|
|
|
|
|
|
headers = self.session.headers.copy()
|
|
|
headers['accept'] = 'application/vnd.api+json'
|
|
|
|
|
|
response = requests.get(endpoint, params=params, headers=headers, timeout=15)
|
|
|
|
|
|
if response.status_code != 200:
|
|
|
logger.error(f"Failed to fetch V2 playlists: {response.status_code} - {response.text}")
|
|
|
return []
|
|
|
|
|
|
data = response.json()
|
|
|
playlists = []
|
|
|
|
|
|
# Step 3: Process the playlists from the main 'data' array.
|
|
|
# Only extract metadata — tracks are fetched on-demand when the user
|
|
|
# selects a playlist to sync/mirror, not during the listing step.
|
|
|
for playlist_data in data.get('data', []):
|
|
|
attributes = playlist_data.get('attributes', {})
|
|
|
playlist_id = playlist_data.get('id')
|
|
|
|
|
|
# Extract image URL from relationships if available
|
|
|
image_url = None
|
|
|
try:
|
|
|
relationships = playlist_data.get('relationships', {})
|
|
|
image_rel = relationships.get('image', {}).get('data', {})
|
|
|
if image_rel:
|
|
|
# Image URL may be in included resources or constructed from ID
|
|
|
image_id = image_rel.get('id', '')
|
|
|
if image_id:
|
|
|
image_url = f"https://resources.tidal.com/images/{image_id.replace('-', '/')}/640x640.jpg"
|
|
|
except Exception as _e:
|
|
|
logger.debug("tidal v2 playlists image_url extract: %s", _e)
|
|
|
|
|
|
new_playlist = Playlist(
|
|
|
id=str(playlist_id),
|
|
|
name=attributes.get('name', 'Unknown Playlist'),
|
|
|
description=attributes.get('description', ''),
|
|
|
external_urls={'tidal': f"https://listen.tidal.com/playlist/{playlist_id}"},
|
|
|
public=attributes.get('accessType') == 'PUBLIC',
|
|
|
tracks=[], # Empty — fetched on-demand via get_playlist()
|
|
|
)
|
|
|
# Store track count from metadata (no API call needed)
|
|
|
# V2 API may use different field names depending on version
|
|
|
new_playlist.track_count = (
|
|
|
attributes.get('numberOfTracks') or
|
|
|
attributes.get('numberOfItems') or
|
|
|
attributes.get('totalNumberOfItems') or
|
|
|
attributes.get('nrOfTracks') or
|
|
|
0
|
|
|
)
|
|
|
if image_url:
|
|
|
new_playlist.image_url = image_url
|
|
|
|
|
|
playlists.append(new_playlist)
|
|
|
|
|
|
logger.info(f"Successfully retrieved {len(playlists)} playlists (metadata only) with the V2 filter method.")
|
|
|
return playlists
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"A critical error occurred while fetching Tidal V2 playlists: {e}")
|
|
|
return []
|
|
|
|
|
|
def _try_direct_playlist_endpoints(self):
|
|
|
"""Fallback method to try direct playlist endpoints without user ID"""
|
|
|
playlists = []
|
|
|
fallback_endpoints = [
|
|
|
(f"{self.alt_base_url}/my/playlists", "v1 fallback my playlists"),
|
|
|
(f"{self.base_url}/me/playlists", "v2 fallback me playlists"),
|
|
|
]
|
|
|
|
|
|
for endpoint, description in fallback_endpoints:
|
|
|
try:
|
|
|
logger.info(f"Fallback: trying {description}")
|
|
|
headers = {
|
|
|
'Accept': 'application/json',
|
|
|
'Authorization': f'Bearer {self.access_token}',
|
|
|
'User-Agent': 'TIDAL_ANDROID/2.47.1 okhttp/4.9.0'
|
|
|
} if "v1" in description else self.session.headers.copy()
|
|
|
|
|
|
response = requests.get(endpoint, headers=headers, params={'limit': 50}, timeout=10)
|
|
|
logger.info(f"Fallback response: {response.status_code}")
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
data = response.json()
|
|
|
# Process response same as above
|
|
|
items = data.get('items', data.get('data', data if isinstance(data, list) else []))
|
|
|
if items:
|
|
|
for item in items:
|
|
|
playlist = Playlist(
|
|
|
id=item.get('id', item.get('uuid', 'unknown')),
|
|
|
name=item.get('title', item.get('name', 'Unknown Playlist')),
|
|
|
description=item.get('description', ''),
|
|
|
external_urls={'tidal': f"https://tidal.com/browse/playlist/{item.get('uuid', item.get('id'))}"},
|
|
|
public=not item.get('publicPlaylist', True)
|
|
|
)
|
|
|
playlists.append(playlist)
|
|
|
logger.info(f"Fallback retrieved {len(playlists)} playlists")
|
|
|
return playlists
|
|
|
except Exception as e:
|
|
|
logger.warning(f"Fallback error: {e}")
|
|
|
continue
|
|
|
|
|
|
logger.error("All Tidal playlist endpoints failed")
|
|
|
return playlists
|
|
|
|
|
|
|
|
|
|
|
|
@rate_limited
|
|
|
def search_tracks(self, query: str, limit: int = 10) -> List[Track]:
|
|
|
"""Search for tracks using Tidal's search API"""
|
|
|
try:
|
|
|
if not self._ensure_valid_token():
|
|
|
logger.error("Not authenticated with Tidal")
|
|
|
return []
|
|
|
|
|
|
from urllib.parse import quote
|
|
|
encoded_query = quote(query, safe='')
|
|
|
params = {
|
|
|
'countryCode': 'US',
|
|
|
'include': 'tracks',
|
|
|
'limit': limit
|
|
|
}
|
|
|
|
|
|
response = self.session.get(
|
|
|
f"{self.base_url}/searchResults/{encoded_query}",
|
|
|
params=params,
|
|
|
timeout=10
|
|
|
)
|
|
|
|
|
|
if response.status_code == 429:
|
|
|
raise Exception("Rate limited (429) on search_tracks")
|
|
|
if response.status_code == 200:
|
|
|
data = response.json()
|
|
|
tracks = []
|
|
|
|
|
|
# Handle V2 JSON:API response formats
|
|
|
items = []
|
|
|
if 'tracks' in data and isinstance(data['tracks'], list):
|
|
|
items = data['tracks']
|
|
|
elif 'tracks' in data and 'items' in data['tracks']:
|
|
|
items = data['tracks']['items']
|
|
|
elif 'included' in data:
|
|
|
items = [r for r in data['included'] if r.get('type') == 'tracks']
|
|
|
|
|
|
for item in items:
|
|
|
# Flatten JSON:API resource if needed
|
|
|
if 'attributes' in item and 'id' in item:
|
|
|
flat = dict(item['attributes'])
|
|
|
flat['id'] = item['id']
|
|
|
item = flat
|
|
|
track = self._parse_track_data(item)
|
|
|
if track:
|
|
|
tracks.append(track)
|
|
|
|
|
|
logger.info(f"Found {len(tracks)} Tidal tracks for query: '{query}'")
|
|
|
return tracks
|
|
|
else:
|
|
|
logger.error(f"Tidal search failed: {response.status_code} - {response.text}")
|
|
|
return []
|
|
|
|
|
|
except Exception as e:
|
|
|
if "429" in str(e):
|
|
|
raise # Let rate_limited decorator handle retry
|
|
|
logger.error(f"Error searching Tidal tracks: {e}")
|
|
|
return []
|
|
|
|
|
|
# ── Enrichment API Methods ──
|
|
|
|
|
|
@rate_limited
|
|
|
def search_artist(self, name: str) -> Optional[Dict]:
|
|
|
"""Search for an artist by name. Returns best matching result as raw dict or None."""
|
|
|
try:
|
|
|
if not self._ensure_valid_token():
|
|
|
return None
|
|
|
|
|
|
from urllib.parse import quote
|
|
|
from difflib import SequenceMatcher
|
|
|
encoded_query = quote(name, safe='')
|
|
|
params = {
|
|
|
'countryCode': 'US',
|
|
|
'include': 'artists',
|
|
|
}
|
|
|
|
|
|
response = self.session.get(
|
|
|
f"{self.base_url}/searchResults/{encoded_query}",
|
|
|
params=params,
|
|
|
timeout=10
|
|
|
)
|
|
|
|
|
|
if response.status_code == 429:
|
|
|
raise Exception("Rate limited (429) on search_artist")
|
|
|
if response.status_code == 200:
|
|
|
data = response.json()
|
|
|
# JSON:API format: included artists in 'artists' or nested in relationships
|
|
|
items = []
|
|
|
if 'artists' in data and isinstance(data['artists'], list):
|
|
|
items = data['artists']
|
|
|
elif 'artists' in data and 'items' in data['artists']:
|
|
|
items = data['artists']['items']
|
|
|
elif 'included' in data:
|
|
|
items = [r for r in data['included'] if r.get('type') == 'artists']
|
|
|
if items:
|
|
|
# Flatten all items and pick best name match
|
|
|
best_item = None
|
|
|
best_score = 0.0
|
|
|
for item in items:
|
|
|
if 'attributes' in item and 'id' in item:
|
|
|
flat = dict(item['attributes'])
|
|
|
flat['id'] = item['id']
|
|
|
else:
|
|
|
flat = item
|
|
|
item_name = flat.get('name', '')
|
|
|
score = SequenceMatcher(None, name.lower(), item_name.lower()).ratio()
|
|
|
if score > best_score:
|
|
|
best_score = score
|
|
|
best_item = flat
|
|
|
return best_item
|
|
|
else:
|
|
|
logger.debug(f"Tidal artist search failed: {response.status_code}")
|
|
|
return None
|
|
|
|
|
|
except Exception as e:
|
|
|
if "429" in str(e):
|
|
|
raise # Let rate_limited decorator handle retry
|
|
|
logger.error(f"Error searching Tidal artist: {e}")
|
|
|
return None
|
|
|
|
|
|
@rate_limited
|
|
|
def search_album(self, artist: str, title: str) -> Optional[Dict]:
|
|
|
"""Search for an album by artist + title. Returns first result as raw dict or None."""
|
|
|
try:
|
|
|
if not self._ensure_valid_token():
|
|
|
return None
|
|
|
|
|
|
from urllib.parse import quote
|
|
|
query = f"{artist} {title}" if artist else title
|
|
|
encoded_query = quote(query, safe='')
|
|
|
params = {
|
|
|
'countryCode': 'US',
|
|
|
'include': 'albums',
|
|
|
}
|
|
|
|
|
|
response = self.session.get(
|
|
|
f"{self.base_url}/searchResults/{encoded_query}",
|
|
|
params=params,
|
|
|
timeout=10
|
|
|
)
|
|
|
|
|
|
if response.status_code == 429:
|
|
|
raise Exception("Rate limited (429) on search_album")
|
|
|
if response.status_code == 200:
|
|
|
data = response.json()
|
|
|
items = []
|
|
|
if 'albums' in data and isinstance(data['albums'], list):
|
|
|
items = data['albums']
|
|
|
elif 'albums' in data and 'items' in data['albums']:
|
|
|
items = data['albums']['items']
|
|
|
elif 'included' in data:
|
|
|
items = [r for r in data['included'] if r.get('type') == 'albums']
|
|
|
if items:
|
|
|
# Flatten all items and pick best title match
|
|
|
from difflib import SequenceMatcher
|
|
|
best_item = None
|
|
|
best_score = 0.0
|
|
|
for item in items:
|
|
|
if 'attributes' in item and 'id' in item:
|
|
|
flat = dict(item['attributes'])
|
|
|
flat['id'] = item['id']
|
|
|
# Preserve artist relationship for cross-verification
|
|
|
try:
|
|
|
rel_artists = item.get('relationships', {}).get('artists', {}).get('data', [])
|
|
|
if rel_artists:
|
|
|
flat['artist'] = {'id': rel_artists[0].get('id')}
|
|
|
except (AttributeError, IndexError, TypeError):
|
|
|
pass
|
|
|
else:
|
|
|
flat = item
|
|
|
item_title = flat.get('title', '')
|
|
|
score = SequenceMatcher(None, title.lower(), item_title.lower()).ratio()
|
|
|
if score > best_score:
|
|
|
best_score = score
|
|
|
best_item = flat
|
|
|
return best_item
|
|
|
else:
|
|
|
logger.debug(f"Tidal album search failed: {response.status_code}")
|
|
|
return None
|
|
|
|
|
|
except Exception as e:
|
|
|
if "429" in str(e):
|
|
|
raise # Let rate_limited decorator handle retry
|
|
|
logger.error(f"Error searching Tidal album: {e}")
|
|
|
return None
|
|
|
|
|
|
@rate_limited
|
|
|
def search_track(self, artist: str, title: str) -> Optional[Dict]:
|
|
|
"""Search for a track by artist + title. Returns first result as raw dict or None."""
|
|
|
try:
|
|
|
if not self._ensure_valid_token():
|
|
|
return None
|
|
|
|
|
|
from urllib.parse import quote
|
|
|
query = f"{artist} {title}" if artist else title
|
|
|
encoded_query = quote(query, safe='')
|
|
|
params = {
|
|
|
'countryCode': 'US',
|
|
|
'include': 'tracks',
|
|
|
}
|
|
|
|
|
|
response = self.session.get(
|
|
|
f"{self.base_url}/searchResults/{encoded_query}",
|
|
|
params=params,
|
|
|
timeout=10
|
|
|
)
|
|
|
|
|
|
if response.status_code == 429:
|
|
|
raise Exception("Rate limited (429) on search_track")
|
|
|
if response.status_code == 200:
|
|
|
data = response.json()
|
|
|
items = []
|
|
|
if 'tracks' in data and isinstance(data['tracks'], list):
|
|
|
items = data['tracks']
|
|
|
elif 'tracks' in data and 'items' in data['tracks']:
|
|
|
items = data['tracks']['items']
|
|
|
elif 'included' in data:
|
|
|
items = [r for r in data['included'] if r.get('type') == 'tracks']
|
|
|
if items:
|
|
|
# Flatten all items and pick best title match
|
|
|
from difflib import SequenceMatcher
|
|
|
best_item = None
|
|
|
best_score = 0.0
|
|
|
for item in items:
|
|
|
if 'attributes' in item and 'id' in item:
|
|
|
flat = dict(item['attributes'])
|
|
|
flat['id'] = item['id']
|
|
|
# Preserve artist relationship for cross-verification
|
|
|
try:
|
|
|
rel_artists = item.get('relationships', {}).get('artists', {}).get('data', [])
|
|
|
if rel_artists:
|
|
|
flat['artist'] = {'id': rel_artists[0].get('id')}
|
|
|
except (AttributeError, IndexError, TypeError):
|
|
|
pass
|
|
|
else:
|
|
|
flat = item
|
|
|
item_title = flat.get('title', '')
|
|
|
score = SequenceMatcher(None, title.lower(), item_title.lower()).ratio()
|
|
|
if score > best_score:
|
|
|
best_score = score
|
|
|
best_item = flat
|
|
|
return best_item
|
|
|
else:
|
|
|
logger.debug(f"Tidal track search failed: {response.status_code}")
|
|
|
return None
|
|
|
|
|
|
except Exception as e:
|
|
|
if "429" in str(e):
|
|
|
raise # Let rate_limited decorator handle retry
|
|
|
logger.error(f"Error searching Tidal track: {e}")
|
|
|
return None
|
|
|
|
|
|
@rate_limited
|
|
|
def get_artist(self, artist_id: str) -> Optional[Dict]:
|
|
|
"""Get full artist details by Tidal ID."""
|
|
|
try:
|
|
|
if not self._ensure_valid_token():
|
|
|
return None
|
|
|
|
|
|
response = self.session.get(
|
|
|
f"{self.base_url}/artists/{artist_id}",
|
|
|
params={'countryCode': 'US'},
|
|
|
headers={'accept': 'application/vnd.api+json'},
|
|
|
timeout=10
|
|
|
)
|
|
|
|
|
|
if response.status_code == 429:
|
|
|
raise Exception("Rate limited (429) on get_artist")
|
|
|
if response.status_code == 200:
|
|
|
data = response.json()
|
|
|
# Handle JSON:API format
|
|
|
if 'data' in data and 'attributes' in data.get('data', {}):
|
|
|
result = dict(data['data'].get('attributes', {}))
|
|
|
result['id'] = data['data'].get('id', artist_id)
|
|
|
return result
|
|
|
return data
|
|
|
else:
|
|
|
logger.debug(f"Tidal get_artist failed: {response.status_code}")
|
|
|
return None
|
|
|
|
|
|
except Exception as e:
|
|
|
if "429" in str(e):
|
|
|
raise # Let rate_limited decorator handle retry
|
|
|
logger.error(f"Error getting Tidal artist {artist_id}: {e}")
|
|
|
return None
|
|
|
|
|
|
@rate_limited
|
|
|
def get_album(self, album_id: str) -> Optional[Dict]:
|
|
|
"""Get full album details by Tidal ID."""
|
|
|
try:
|
|
|
if not self._ensure_valid_token():
|
|
|
return None
|
|
|
|
|
|
response = self.session.get(
|
|
|
f"{self.base_url}/albums/{album_id}",
|
|
|
params={'countryCode': 'US'},
|
|
|
headers={'accept': 'application/vnd.api+json'},
|
|
|
timeout=10
|
|
|
)
|
|
|
|
|
|
if response.status_code == 429:
|
|
|
raise Exception("Rate limited (429) on get_album")
|
|
|
if response.status_code == 200:
|
|
|
data = response.json()
|
|
|
if 'data' in data and 'attributes' in data.get('data', {}):
|
|
|
result = dict(data['data'].get('attributes', {}))
|
|
|
result['id'] = data['data'].get('id', album_id)
|
|
|
return result
|
|
|
return data
|
|
|
else:
|
|
|
logger.debug(f"Tidal get_album failed: {response.status_code}")
|
|
|
return None
|
|
|
|
|
|
except Exception as e:
|
|
|
if "429" in str(e):
|
|
|
raise # Let rate_limited decorator handle retry
|
|
|
logger.error(f"Error getting Tidal album {album_id}: {e}")
|
|
|
return None
|
|
|
|
|
|
@rate_limited
|
|
|
def get_album_tracks(self, album_id: str, limit: Optional[int] = None) -> List[Track]:
|
|
|
"""Fetch every track on an album with full artist + name + duration
|
|
|
metadata hydrated.
|
|
|
|
|
|
Two-phase: walk `/v2/albums/{id}/relationships/items?include=items`
|
|
|
cursor chain to enumerate track IDs (with their position metadata —
|
|
|
`meta.trackNumber` + `meta.volumeNumber` for multi-disc), then
|
|
|
feed the IDs through the existing `_get_tracks_batch` helper for
|
|
|
artist + album-name resolution.
|
|
|
|
|
|
Returns a list of `Track` dataclasses with `track_number` and
|
|
|
`disc_number` attached as ad-hoc attributes so callers that need
|
|
|
per-position info (download modal, virtual playlist build) can
|
|
|
read them. Backend `/api/discover/album/<source>/<album_id>`
|
|
|
serializes these to the same shape Spotify/Deezer return."""
|
|
|
if not self._ensure_valid_token():
|
|
|
return []
|
|
|
|
|
|
# Phase 1: enumerate track IDs + position metadata via cursor pagination.
|
|
|
# The relationship endpoint pages at 20 items by default. The `meta`
|
|
|
# dict on each ref carries `trackNumber` + `volumeNumber` (multi-disc).
|
|
|
track_meta_by_id: Dict[str, Dict[str, int]] = {}
|
|
|
track_ids: List[str] = []
|
|
|
next_path: Optional[str] = None
|
|
|
|
|
|
while True:
|
|
|
if next_path:
|
|
|
url = (next_path if next_path.startswith('http')
|
|
|
else f"https://openapi.tidal.com/v2{next_path}")
|
|
|
params = None
|
|
|
else:
|
|
|
url = f"{self.base_url}/albums/{album_id}/relationships/items"
|
|
|
params = {'countryCode': 'US', 'include': 'items'}
|
|
|
|
|
|
try:
|
|
|
resp = self.session.get(
|
|
|
url, params=params,
|
|
|
headers={'accept': 'application/vnd.api+json'},
|
|
|
timeout=15,
|
|
|
)
|
|
|
except Exception as e:
|
|
|
logger.debug(f"Tidal album-tracks page request failed: {e}")
|
|
|
break
|
|
|
|
|
|
if resp.status_code != 200:
|
|
|
if resp.status_code == 429:
|
|
|
raise Exception("Rate limited (429) on get_album_tracks")
|
|
|
logger.debug(
|
|
|
f"Tidal album-tracks page returned {resp.status_code}: {resp.text[:200]}"
|
|
|
)
|
|
|
break
|
|
|
|
|
|
try:
|
|
|
data = resp.json()
|
|
|
except ValueError:
|
|
|
break
|
|
|
|
|
|
for item in data.get('data', []):
|
|
|
if item.get('type') != 'tracks':
|
|
|
continue
|
|
|
tid = item.get('id')
|
|
|
if not tid:
|
|
|
continue
|
|
|
tid = str(tid)
|
|
|
meta = item.get('meta', {}) or {}
|
|
|
track_meta_by_id[tid] = {
|
|
|
'track_number': int(meta.get('trackNumber') or 0),
|
|
|
'disc_number': int(meta.get('volumeNumber') or 1),
|
|
|
}
|
|
|
track_ids.append(tid)
|
|
|
if limit is not None and len(track_ids) >= limit:
|
|
|
break
|
|
|
|
|
|
if limit is not None and len(track_ids) >= limit:
|
|
|
break
|
|
|
|
|
|
next_path = data.get('links', {}).get('next')
|
|
|
if not next_path:
|
|
|
break
|
|
|
|
|
|
time.sleep(0.3)
|
|
|
|
|
|
if not track_ids:
|
|
|
return []
|
|
|
|
|
|
# Phase 2: batch hydrate via existing helper (artists + album names).
|
|
|
# Annotate each Track with position metadata so callers can build the
|
|
|
# per-track-number payload the download pipeline expects.
|
|
|
hydrated: List[Track] = []
|
|
|
for i in range(0, len(track_ids), self._COLLECTION_BATCH_SIZE):
|
|
|
batch_ids = track_ids[i:i + self._COLLECTION_BATCH_SIZE]
|
|
|
try:
|
|
|
batch_tracks = self._get_tracks_batch(batch_ids)
|
|
|
except Exception as e:
|
|
|
logger.debug(f"Tidal album-tracks batch hydration failed: {e}")
|
|
|
continue
|
|
|
for t in batch_tracks:
|
|
|
meta = track_meta_by_id.get(str(t.id), {})
|
|
|
t.track_number = meta.get('track_number', 0)
|
|
|
t.disc_number = meta.get('disc_number', 1)
|
|
|
hydrated.append(t)
|
|
|
|
|
|
# Tidal's relationship walk returns tracks in album order; the
|
|
|
# batch endpoint may not preserve order. Sort by (disc, track)
|
|
|
# so the modal renders the album top-down.
|
|
|
hydrated.sort(key=lambda t: (
|
|
|
getattr(t, 'disc_number', 1),
|
|
|
getattr(t, 'track_number', 0),
|
|
|
))
|
|
|
logger.info(
|
|
|
f"Retrieved {len(hydrated)}/{len(track_ids)} tracks for Tidal album {album_id}"
|
|
|
)
|
|
|
return hydrated
|
|
|
|
|
|
@rate_limited
|
|
|
def get_track(self, track_id: str) -> Optional[Dict]:
|
|
|
"""Get full track details by Tidal ID."""
|
|
|
try:
|
|
|
if not self._ensure_valid_token():
|
|
|
return None
|
|
|
|
|
|
response = self.session.get(
|
|
|
f"{self.base_url}/tracks/{track_id}",
|
|
|
params={'countryCode': 'US'},
|
|
|
headers={'accept': 'application/vnd.api+json'},
|
|
|
timeout=10
|
|
|
)
|
|
|
|
|
|
if response.status_code == 429:
|
|
|
raise Exception("Rate limited (429) on get_track")
|
|
|
if response.status_code == 200:
|
|
|
data = response.json()
|
|
|
if 'data' in data and 'attributes' in data.get('data', {}):
|
|
|
result = dict(data['data'].get('attributes', {}))
|
|
|
result['id'] = data['data'].get('id', track_id)
|
|
|
return result
|
|
|
return data
|
|
|
else:
|
|
|
logger.debug(f"Tidal get_track failed: {response.status_code}")
|
|
|
return None
|
|
|
|
|
|
except Exception as e:
|
|
|
if "429" in str(e):
|
|
|
raise # Let rate_limited decorator handle retry
|
|
|
logger.error(f"Error getting Tidal track {track_id}: {e}")
|
|
|
return None
|
|
|
|
|
|
@rate_limited
|
|
|
def get_playlist(self, playlist_id: str) -> Optional[Playlist]:
|
|
|
"""Get playlist details including tracks using JSON:API format.
|
|
|
|
|
|
Recognizes the virtual ``tidal-favorites`` ID and dispatches
|
|
|
to ``get_collection_tracks`` so every caller that already
|
|
|
accepts a playlist ID (mirror auto-refresh, discovery start,
|
|
|
per-playlist detail endpoint) gets Favorite Tracks support
|
|
|
for free without per-site special-casing.
|
|
|
|
|
|
ID intentionally has no colon — the sync-services.js renderer
|
|
|
builds CSS selectors via template literal interpolation
|
|
|
(``#tidal-card-${p.id} .playlist-card-track-count``) and a
|
|
|
``:`` in the ID would be parsed as a CSS pseudo-class operator.
|
|
|
"""
|
|
|
try:
|
|
|
if playlist_id == COLLECTION_PLAYLIST_ID:
|
|
|
collection_tracks = self.get_collection_tracks()
|
|
|
return Playlist(
|
|
|
id=COLLECTION_PLAYLIST_ID,
|
|
|
name=COLLECTION_PLAYLIST_NAME,
|
|
|
description=COLLECTION_PLAYLIST_DESCRIPTION,
|
|
|
tracks=collection_tracks,
|
|
|
owner={'name': 'You'},
|
|
|
public=False,
|
|
|
)
|
|
|
|
|
|
if not self._ensure_valid_token():
|
|
|
logger.error("Not authenticated with Tidal")
|
|
|
return None
|
|
|
|
|
|
# Get playlist metadata with JSON:API format
|
|
|
headers = {'accept': 'application/vnd.api+json'}
|
|
|
response = self.session.get(
|
|
|
f"{self.base_url}/playlists/{playlist_id}",
|
|
|
params={'countryCode': 'US'},
|
|
|
headers=headers,
|
|
|
timeout=10
|
|
|
)
|
|
|
|
|
|
response.raise_for_status()
|
|
|
|
|
|
if response.status_code != 200:
|
|
|
logger.error(f"Failed to get Tidal playlist {playlist_id}: {response.status_code} - {response.text}")
|
|
|
return None
|
|
|
|
|
|
# Parse JSON:API response structure
|
|
|
playlist_data = response.json().get("data", {})
|
|
|
playlist_attrs = playlist_data.get("attributes", {})
|
|
|
|
|
|
# Get playlist tracks with cursor-based pagination
|
|
|
tracks = []
|
|
|
cursor = None
|
|
|
total_fetched = 0
|
|
|
page_num = 0
|
|
|
consecutive_failures = 0
|
|
|
MAX_PAGE_RETRIES = 3
|
|
|
|
|
|
while True:
|
|
|
page_num += 1
|
|
|
|
|
|
# Rate limit between pagination pages (skip first page)
|
|
|
if page_num > 1:
|
|
|
time.sleep(1.0)
|
|
|
|
|
|
# Fetch a page of track IDs
|
|
|
try:
|
|
|
tracks_page = self._get_playlist_tracks_page(playlist_id, cursor)
|
|
|
except Exception as e:
|
|
|
error_str = str(e)
|
|
|
if "429" in error_str or "rate limit" in error_str.lower():
|
|
|
consecutive_failures += 1
|
|
|
if consecutive_failures <= MAX_PAGE_RETRIES:
|
|
|
backoff = 10.0 * consecutive_failures # 10s, 20s, 30s
|
|
|
logger.warning(f"Playlist pagination rate limited on page {page_num}, waiting {backoff}s (attempt {consecutive_failures}/{MAX_PAGE_RETRIES})")
|
|
|
time.sleep(backoff)
|
|
|
page_num -= 1 # Retry same page
|
|
|
continue
|
|
|
else:
|
|
|
logger.error(f"Playlist pagination failed after {MAX_PAGE_RETRIES} retries, returning {total_fetched} tracks fetched so far")
|
|
|
break
|
|
|
else:
|
|
|
logger.error(f"Error fetching playlist page {page_num}: {e}")
|
|
|
break
|
|
|
|
|
|
if not tracks_page or not tracks_page.get("data"):
|
|
|
logger.info("No more tracks found, stopping pagination")
|
|
|
break
|
|
|
|
|
|
# Reset failure counter on success
|
|
|
consecutive_failures = 0
|
|
|
|
|
|
# Extract track IDs from this page
|
|
|
track_ids = []
|
|
|
for item in tracks_page.get("data", []):
|
|
|
# In JSON:API, relationship items have both 'type' and 'id'
|
|
|
# The type should be 'tracks' but we'll be defensive
|
|
|
if item.get("type") and item.get("id"):
|
|
|
track_ids.append(item.get("id"))
|
|
|
|
|
|
if track_ids:
|
|
|
# Batch fetch full track details with artists and albums
|
|
|
try:
|
|
|
batch_tracks = self._get_tracks_batch(track_ids)
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error fetching track details for page {page_num}: {e}")
|
|
|
# Continue pagination — we lose this batch but can still get remaining
|
|
|
batch_tracks = []
|
|
|
|
|
|
if len(batch_tracks) < len(track_ids):
|
|
|
logger.warning(f"Page {page_num}: requested {len(track_ids)} tracks but only {len(batch_tracks)} returned (some may be unavailable in your region)")
|
|
|
|
|
|
tracks.extend(batch_tracks)
|
|
|
total_fetched += len(batch_tracks)
|
|
|
logger.info(f"Fetched {len(batch_tracks)} tracks in this batch, {total_fetched} total so far")
|
|
|
|
|
|
# Get next cursor from Tidal's response
|
|
|
# Tidal uses: links.meta.nextCursor (confirmed by PR #113)
|
|
|
cursor = tracks_page.get("links", {}).get("meta", {}).get("nextCursor")
|
|
|
|
|
|
# If no cursor found, pagination is complete
|
|
|
if not cursor:
|
|
|
logger.info("No next cursor found, pagination complete")
|
|
|
break
|
|
|
|
|
|
playlist = Playlist(
|
|
|
id=playlist_data.get('id', playlist_id),
|
|
|
name=playlist_attrs.get('name', 'Unknown Playlist'),
|
|
|
description=playlist_attrs.get('description', ''),
|
|
|
tracks=tracks,
|
|
|
external_urls={'tidal': f"https://listen.tidal.com/playlist/{playlist_id}"},
|
|
|
public=playlist_attrs.get('accessType', '') == "PUBLIC"
|
|
|
)
|
|
|
|
|
|
# Extract cover image URL from relationships (same logic as get_user_playlists_metadata_only)
|
|
|
try:
|
|
|
relationships = playlist_data.get('relationships', {})
|
|
|
image_rel = relationships.get('image', {}).get('data', {})
|
|
|
if image_rel:
|
|
|
image_id = image_rel.get('id', '')
|
|
|
if image_id:
|
|
|
playlist.image_url = f"https://resources.tidal.com/images/{image_id.replace('-', '/')}/640x640.jpg"
|
|
|
except Exception as _e:
|
|
|
logger.debug("tidal playlist image_url extract: %s", _e)
|
|
|
|
|
|
logger.info(f"Retrieved Tidal playlist '{playlist.name}' with {len(tracks)} tracks")
|
|
|
return playlist
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error getting Tidal playlist {playlist_id}: {e}")
|
|
|
return None
|
|
|
|
|
|
@rate_limited
|
|
|
def _get_playlist_tracks_page(self, playlist_id: str, cursor: Optional[str] = None) -> Optional[Dict[str, Any]]:
|
|
|
"""Fetch a page of track IDs from a playlist using cursor-based pagination"""
|
|
|
try:
|
|
|
params = {"countryCode": "US"}
|
|
|
if cursor:
|
|
|
params["page[cursor]"] = cursor
|
|
|
|
|
|
headers = {'accept': 'application/vnd.api+json'}
|
|
|
response = self.session.get(
|
|
|
f"{self.base_url}/playlists/{playlist_id}/relationships/items",
|
|
|
params=params,
|
|
|
headers=headers,
|
|
|
timeout=10
|
|
|
)
|
|
|
|
|
|
response.raise_for_status()
|
|
|
|
|
|
if response.status_code != 200:
|
|
|
logger.error(f"Failed to get playlist tracks page: {response.status_code} - {response.text}")
|
|
|
return None
|
|
|
|
|
|
return response.json()
|
|
|
|
|
|
except requests.exceptions.HTTPError:
|
|
|
raise # Let HTTP errors (429, 503, etc.) propagate to rate_limited decorator for retry
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error fetching playlist tracks page: {e}")
|
|
|
return None
|
|
|
|
|
|
@rate_limited
|
|
|
def _get_tracks_batch(self, track_ids: List[str]) -> List[Track]:
|
|
|
"""Batch fetch track details with artists and albums included"""
|
|
|
try:
|
|
|
if not track_ids:
|
|
|
return []
|
|
|
|
|
|
params = {
|
|
|
"countryCode": "US",
|
|
|
"include": "artists,albums",
|
|
|
"filter[id]": ",".join(track_ids)
|
|
|
}
|
|
|
|
|
|
headers = {'accept': 'application/vnd.api+json'}
|
|
|
response = self.session.get(
|
|
|
f"{self.base_url}/tracks",
|
|
|
params=params,
|
|
|
headers=headers,
|
|
|
timeout=10
|
|
|
)
|
|
|
|
|
|
response.raise_for_status()
|
|
|
|
|
|
if response.status_code != 200:
|
|
|
logger.error(f"Failed to get tracks batch: {response.status_code} - {response.text}")
|
|
|
return []
|
|
|
|
|
|
tracks_data = response.json()
|
|
|
|
|
|
# Build lookup caches for albums and artists from included data
|
|
|
album_cache: Dict[str, str] = {}
|
|
|
artist_cache: Dict[str, str] = {}
|
|
|
|
|
|
for item in tracks_data.get("included", []):
|
|
|
item_id = item.get("id")
|
|
|
item_type = item.get("type")
|
|
|
|
|
|
if item_type == "albums":
|
|
|
album_cache[item_id] = item.get("attributes", {}).get("title", "Unknown Album")
|
|
|
elif item_type == "artists":
|
|
|
artist_cache[item_id] = item.get("attributes", {}).get("name", "Unknown Artist")
|
|
|
|
|
|
# Parse tracks and hydrate with artist/album data
|
|
|
hydrated_tracks: List[Track] = []
|
|
|
|
|
|
for track_data in tracks_data.get("data", []):
|
|
|
attrs = track_data.get("attributes", {})
|
|
|
track_id = track_data.get("id")
|
|
|
relationships = track_data.get("relationships", {})
|
|
|
|
|
|
# Get album name from cache
|
|
|
album_data = relationships.get("albums", {}).get("data", [])
|
|
|
album_id = album_data[0].get("id") if album_data else None
|
|
|
album = album_cache.get(album_id, "Unknown Album")
|
|
|
|
|
|
# Get artist names from cache
|
|
|
artist_data_list = relationships.get("artists", {}).get("data", [])
|
|
|
artists = [
|
|
|
artist_cache.get(artist_ref.get("id"), "Unknown Artist")
|
|
|
for artist_ref in artist_data_list
|
|
|
if artist_ref.get("id")
|
|
|
]
|
|
|
|
|
|
if not artists:
|
|
|
artists = ["Unknown Artist"]
|
|
|
|
|
|
# Parse duration (ISO-8601 format like 'PT3M36S')
|
|
|
duration_ms = self._parse_iso_duration(attrs.get('duration', ''))
|
|
|
|
|
|
# Append version info (e.g. "BMotion Remix") to title if present
|
|
|
track_title = attrs.get('title', 'Unknown Track')
|
|
|
track_version = attrs.get('version') or ''
|
|
|
if track_version and track_version.lower() not in track_title.lower():
|
|
|
track_title = f"{track_title} ({track_version})"
|
|
|
|
|
|
hydrated_tracks.append(Track(
|
|
|
id=str(track_id),
|
|
|
name=track_title,
|
|
|
artists=artists,
|
|
|
album=album,
|
|
|
duration_ms=duration_ms,
|
|
|
external_urls={'tidal': f"https://listen.tidal.com/track/{track_id}"},
|
|
|
explicit=attrs.get('explicit', False)
|
|
|
))
|
|
|
|
|
|
return hydrated_tracks
|
|
|
|
|
|
except requests.exceptions.HTTPError:
|
|
|
raise # Let HTTP errors (429, 503, etc.) propagate to rate_limited decorator for retry
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error getting tracks batch: {e}")
|
|
|
return []
|
|
|
|
|
|
def _parse_iso_duration(self, duration: str) -> int:
|
|
|
"""Convert ISO-8601 duration string (e.g., 'PT3M36S' or 'PT1H30M45S') to milliseconds"""
|
|
|
if not duration or not duration.startswith("PT"):
|
|
|
return 0
|
|
|
|
|
|
total_seconds = 0
|
|
|
|
|
|
# Extract hours, minutes, and seconds using regex
|
|
|
hours_match = re.search(r"(\d+)H", duration)
|
|
|
minutes_match = re.search(r"(\d+)M", duration)
|
|
|
seconds_match = re.search(r"(\d+)S", duration)
|
|
|
|
|
|
if hours_match:
|
|
|
total_seconds += int(hours_match.group(1)) * 3600
|
|
|
if minutes_match:
|
|
|
total_seconds += int(minutes_match.group(1)) * 60
|
|
|
if seconds_match:
|
|
|
total_seconds += int(seconds_match.group(1))
|
|
|
|
|
|
return total_seconds * 1000
|
|
|
|
|
|
def _parse_track_data(self, item: Dict[str, Any]) -> Optional[Track]:
|
|
|
"""Parse Tidal track data into Track object"""
|
|
|
try:
|
|
|
track_id = item.get('id')
|
|
|
if not track_id:
|
|
|
return None
|
|
|
|
|
|
# Extract artist names
|
|
|
artists = []
|
|
|
if 'artists' in item:
|
|
|
artists = [artist.get('name', 'Unknown') for artist in item['artists']]
|
|
|
elif 'artist' in item:
|
|
|
artists = [item['artist'].get('name', 'Unknown')]
|
|
|
|
|
|
# Append version info (e.g. "Bloom remix") to title if present
|
|
|
track_title = item.get('title', 'Unknown Track')
|
|
|
track_version = item.get('version') or ''
|
|
|
if track_version and track_version.lower() not in track_title.lower():
|
|
|
track_title = f"{track_title} ({track_version})"
|
|
|
|
|
|
track = Track(
|
|
|
id=str(track_id),
|
|
|
name=track_title,
|
|
|
artists=artists,
|
|
|
album=item.get('album', {}).get('title', 'Unknown Album'),
|
|
|
duration_ms=item.get('duration', 0) * 1000, # Convert seconds to ms
|
|
|
external_urls={'tidal': f"https://tidal.com/browse/track/{track_id}"},
|
|
|
explicit=item.get('explicit', False)
|
|
|
)
|
|
|
|
|
|
return track
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error parsing Tidal track data: {e}")
|
|
|
return None
|
|
|
|
|
|
def get_user_info(self) -> Optional[Dict[str, Any]]:
|
|
|
"""Get current user information"""
|
|
|
try:
|
|
|
if not self._ensure_valid_token():
|
|
|
logger.error("Not authenticated with Tidal")
|
|
|
return None
|
|
|
return {
|
|
|
'display_name': 'Tidal User',
|
|
|
'id': 'tidal_user',
|
|
|
'type': 'user'
|
|
|
}
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error getting Tidal user info: {e}")
|
|
|
return None
|
|
|
|
|
|
# `get_favorite_artists` and `get_favorite_albums` were defined here
|
|
|
# against the legacy `/v2/favorites?filter[type]=...` endpoint with a
|
|
|
# V1 fallback. Both paths are dead in 2026: V2 returns 404 for
|
|
|
# personal favorites (it's scoped to third-party-app-created
|
|
|
# collections only), and V1 returns 403 because modern OAuth tokens
|
|
|
# carry `collection.read` instead of the legacy `r_usr` scope V1
|
|
|
# demands. Replaced by the V2 user-collection endpoints below — see
|
|
|
# the "Favorited albums + artists" section near the end of this class.
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
|
# User Collection ("Favorite Tracks" — Tidal calls this "My Collection")
|
|
|
# ------------------------------------------------------------------
|
|
|
#
|
|
|
# Tidal V2 exposes the user's favorited tracks via a separate
|
|
|
# cursor-paginated endpoint:
|
|
|
#
|
|
|
# GET /v2/userCollectionTracks/me/relationships/items
|
|
|
# ?countryCode=US&locale=en-US&include=items
|
|
|
#
|
|
|
# Each page returns up to 20 entries in `data[]` (track refs with
|
|
|
# `id` + `type='tracks'` + `meta.addedAt`) and an OPTIONAL `links.next`
|
|
|
# URL for the next page. The included track resources only carry the
|
|
|
# track-level attributes (title, isrc, duration, mediaTags) — artists
|
|
|
# and album NAMES come back as relationship-link stubs only, not
|
|
|
# embedded data.
|
|
|
#
|
|
|
# We split the work in two phases:
|
|
|
# 1) `_iter_collection_track_ids()` — enumerates every collection
|
|
|
# entry by following the cursor chain. Cheap (just IDs + types).
|
|
|
# 2) Hydration — feeds the IDs through the existing
|
|
|
# `_get_tracks_batch` helper which already knows how to
|
|
|
# `include=artists,albums` and produce fully-populated `Track`
|
|
|
# objects matching the rest of the codebase. No new parsing,
|
|
|
# no new dataclass shape, no duplication of the JSON:API parse.
|
|
|
#
|
|
|
# Reference: https://github.com/Nezreka/SoulSync/issues/502 — reporter
|
|
|
# Yug1900 located the working endpoint after the prior `/v2/favorites`
|
|
|
# filter approach returned empty data for personal favorites.
|
|
|
#
|
|
|
# Auth state — calling code (web_server.py listing endpoint) needs
|
|
|
# to know whether an empty result means "user has no favorites" or
|
|
|
# "token lacks `collection.read` scope and needs reconnect". The
|
|
|
# iter helper sets `self._collection_needs_reconnect = True` when
|
|
|
# the endpoint returns 401/403 so the listing endpoint can surface
|
|
|
# a user-actionable hint instead of silently hiding the row.
|
|
|
|
|
|
_COLLECTION_TRACKS_PATH = "userCollectionTracks/me/relationships/items"
|
|
|
_COLLECTION_ALBUMS_PATH = "userCollectionAlbums/me/relationships/items"
|
|
|
_COLLECTION_ARTISTS_PATH = "userCollectionArtists/me/relationships/items"
|
|
|
_COLLECTION_BATCH_SIZE = 20 # Tidal `filter[id]` page cap
|
|
|
|
|
|
@rate_limited
|
|
|
def _iter_collection_resource_ids(self, path: str, expected_type: str,
|
|
|
max_ids: Optional[int] = None) -> List[str]:
|
|
|
"""Walk a cursor-paginated collection endpoint and return the
|
|
|
list of resource IDs (tracks / albums / artists).
|
|
|
|
|
|
Generic across all three favorited-resource endpoints — the
|
|
|
only differences between them are the path segment and the
|
|
|
``type`` field on each ``data[]`` entry. Pagination, auth,
|
|
|
scope-failure detection, and the diagnostic logging are
|
|
|
identical.
|
|
|
|
|
|
``max_ids`` caps the walk early. Returns ``[]`` when not
|
|
|
authenticated or when the endpoint refuses (e.g. token without
|
|
|
``collection.read`` scope). On 401/403 also flips
|
|
|
``self._collection_needs_reconnect = True`` so the caller can
|
|
|
distinguish 'empty collection' from 'missing scope'."""
|
|
|
# Reset on every call so a successful walk clears any stale flag
|
|
|
# left from a prior failed attempt.
|
|
|
self._collection_needs_reconnect = False
|
|
|
|
|
|
if not self._ensure_valid_token():
|
|
|
logger.debug("Tidal not authenticated — cannot fetch collection %s", expected_type)
|
|
|
return []
|
|
|
|
|
|
ids: List[str] = []
|
|
|
next_path: Optional[str] = None
|
|
|
|
|
|
while True:
|
|
|
if next_path:
|
|
|
# `links.next` from Tidal is path-relative to /v2 root,
|
|
|
# already carries every query param we need (cursor + countryCode + locale + include).
|
|
|
url = next_path if next_path.startswith('http') else f"https://openapi.tidal.com/v2{next_path}"
|
|
|
params = None
|
|
|
else:
|
|
|
url = f"{self.base_url}/{path}"
|
|
|
params = {
|
|
|
'countryCode': 'US',
|
|
|
'locale': 'en-US',
|
|
|
'include': 'items',
|
|
|
}
|
|
|
|
|
|
try:
|
|
|
headers = {
|
|
|
'accept': 'application/vnd.api+json',
|
|
|
'Authorization': f'Bearer {self.access_token}',
|
|
|
}
|
|
|
logger.info(
|
|
|
f"Tidal collection request: GET {url} (params={params})"
|
|
|
)
|
|
|
resp = requests.get(url, params=params, headers=headers, timeout=15)
|
|
|
logger.info(
|
|
|
f"Tidal collection response: status={resp.status_code} "
|
|
|
f"body[:300]={resp.text[:300]!r}"
|
|
|
)
|
|
|
except Exception as e:
|
|
|
logger.warning(f"Tidal collection page request failed: {e}")
|
|
|
break
|
|
|
|
|
|
if resp.status_code != 200:
|
|
|
# 401/403 = scope/permission issue. Token predates the
|
|
|
# `collection.read` scope expansion or the user revoked
|
|
|
# it — flag for the UI hint and bail.
|
|
|
if resp.status_code in (401, 403):
|
|
|
self._collection_needs_reconnect = True
|
|
|
logger.info(
|
|
|
"Tidal collection endpoint returned %s — reconnect Tidal "
|
|
|
"in Settings → Connections to grant `collection.read` scope.",
|
|
|
resp.status_code,
|
|
|
)
|
|
|
else:
|
|
|
logger.warning(
|
|
|
f"Tidal collection page returned {resp.status_code}: {resp.text[:500]}"
|
|
|
)
|
|
|
break
|
|
|
|
|
|
try:
|
|
|
data = resp.json()
|
|
|
except ValueError as e:
|
|
|
logger.debug(f"Tidal collection response not JSON: {e}")
|
|
|
break
|
|
|
|
|
|
for item in data.get('data', []):
|
|
|
if item.get('type') != expected_type:
|
|
|
continue
|
|
|
rid = item.get('id')
|
|
|
if rid:
|
|
|
ids.append(str(rid))
|
|
|
if max_ids is not None and len(ids) >= max_ids:
|
|
|
return ids
|
|
|
|
|
|
next_path = data.get('links', {}).get('next')
|
|
|
if not next_path:
|
|
|
break
|
|
|
|
|
|
time.sleep(0.3) # Cursor pagination courtesy delay
|
|
|
|
|
|
return ids
|
|
|
|
|
|
def _iter_collection_track_ids(self, max_ids: Optional[int] = None) -> List[str]:
|
|
|
"""Favorited tracks — thin wrapper over the generic walker."""
|
|
|
return self._iter_collection_resource_ids(
|
|
|
self._COLLECTION_TRACKS_PATH, 'tracks', max_ids,
|
|
|
)
|
|
|
|
|
|
def _iter_collection_album_ids(self, max_ids: Optional[int] = None) -> List[str]:
|
|
|
"""Favorited albums — thin wrapper over the generic walker."""
|
|
|
return self._iter_collection_resource_ids(
|
|
|
self._COLLECTION_ALBUMS_PATH, 'albums', max_ids,
|
|
|
)
|
|
|
|
|
|
def _iter_collection_artist_ids(self, max_ids: Optional[int] = None) -> List[str]:
|
|
|
"""Favorited artists — thin wrapper over the generic walker."""
|
|
|
return self._iter_collection_resource_ids(
|
|
|
self._COLLECTION_ARTISTS_PATH, 'artists', max_ids,
|
|
|
)
|
|
|
|
|
|
def collection_needs_reconnect(self) -> bool:
|
|
|
"""True when the most recent collection fetch hit a 401/403 —
|
|
|
i.e. the saved token doesn't have `collection.read` scope and
|
|
|
the user needs to reconnect Tidal in Settings → Connections.
|
|
|
|
|
|
Reset to False at the start of every `_iter_collection_track_ids`
|
|
|
call so a successful walk clears stale flags."""
|
|
|
return getattr(self, '_collection_needs_reconnect', False)
|
|
|
|
|
|
def get_collection_tracks_count(self) -> int:
|
|
|
"""Total count of tracks in the user's Favorite Tracks.
|
|
|
|
|
|
Tidal's cursor pagination doesn't expose a `meta.total` so we
|
|
|
walk the full ID list. Cheap relative to hydration (one small
|
|
|
request per 20 tracks, no per-track lookups), but linear in
|
|
|
collection size — call sparingly."""
|
|
|
try:
|
|
|
return len(self._iter_collection_track_ids())
|
|
|
except Exception as e:
|
|
|
logger.debug(f"Failed to get Tidal collection tracks count: {e}")
|
|
|
return 0
|
|
|
|
|
|
def get_collection_tracks(self, limit: Optional[int] = None) -> List[Track]:
|
|
|
"""Fetch user's favorited tracks with full artist + album
|
|
|
metadata hydrated via `_get_tracks_batch`.
|
|
|
|
|
|
Returns the same `Track` shape every other Tidal playlist
|
|
|
method returns, so the virtual-playlist plumbing in
|
|
|
`web_server.py` can reuse the existing serialization path."""
|
|
|
try:
|
|
|
track_ids = self._iter_collection_track_ids(max_ids=limit)
|
|
|
if not track_ids:
|
|
|
return []
|
|
|
|
|
|
hydrated: List[Track] = []
|
|
|
for i in range(0, len(track_ids), self._COLLECTION_BATCH_SIZE):
|
|
|
batch = track_ids[i:i + self._COLLECTION_BATCH_SIZE]
|
|
|
try:
|
|
|
hydrated.extend(self._get_tracks_batch(batch))
|
|
|
except Exception as e:
|
|
|
logger.debug(
|
|
|
f"Tidal collection batch hydration failed for IDs {batch}: {e}"
|
|
|
)
|
|
|
|
|
|
logger.info(
|
|
|
f"Retrieved {len(hydrated)}/{len(track_ids)} tracks from Tidal Favorite Tracks"
|
|
|
)
|
|
|
return hydrated
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error fetching Tidal collection tracks: {e}")
|
|
|
return []
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
|
# Favorited albums + artists — V2 collection endpoints
|
|
|
# ------------------------------------------------------------------
|
|
|
#
|
|
|
# Same problem the tracks side hit on issue #502: the prior
|
|
|
# `/v2/favorites?filter[type]=ALBUMS|ARTISTS` endpoints are
|
|
|
# deprecated (404) and the V1 fallback (`/v1/users/<id>/favorites/
|
|
|
# albums|artists`) returns 403 because modern OAuth tokens with
|
|
|
# `collection.read` scope don't have the legacy `r_usr` scope V1
|
|
|
# requires. Discord-reported symptom: Discover → Your Albums (and
|
|
|
# Your Artists) section shows nothing for Tidal users regardless
|
|
|
# of how many albums/artists they've favorited.
|
|
|
#
|
|
|
# Fix mirrors the tracks path:
|
|
|
# 1) Cursor-walk `/v2/userCollection{Albums|Artists}/me/relationships/items`
|
|
|
# via `_iter_collection_album_ids` / `_iter_collection_artist_ids`
|
|
|
# (lifted into the generic `_iter_collection_resource_ids` helper).
|
|
|
# 2) Batch-hydrate via `/v2/{albums|artists}?filter[id]=...&include=...`
|
|
|
# with single-request fan-out (artists+coverArt for albums,
|
|
|
# profileArt for artists). Parses JSON:API `included[]` for
|
|
|
# artist names + image URLs.
|
|
|
#
|
|
|
# Public surface preserves the existing return shape — list of
|
|
|
# dicts matching what `database.upsert_liked_album` /
|
|
|
# `upsert_liked_artist` consume — so web_server.py callers
|
|
|
# (`/api/discover/your-albums-fetch` and equivalent) stay
|
|
|
# byte-identical.
|
|
|
|
|
|
@rate_limited
|
|
|
def _get_albums_batch(self, album_ids: List[str]) -> List[Dict[str, Any]]:
|
|
|
"""Batch-fetch album metadata + cover art + artist names in
|
|
|
one request via JSON:API extended-include semantics. Returns
|
|
|
list of dicts matching `database.upsert_liked_album` kwargs."""
|
|
|
if not album_ids:
|
|
|
return []
|
|
|
try:
|
|
|
params = {
|
|
|
'countryCode': 'US',
|
|
|
'include': 'artists,coverArt',
|
|
|
'filter[id]': ','.join(album_ids),
|
|
|
}
|
|
|
headers = {'accept': 'application/vnd.api+json'}
|
|
|
resp = self.session.get(
|
|
|
f"{self.base_url}/albums",
|
|
|
params=params, headers=headers, timeout=15,
|
|
|
)
|
|
|
if resp.status_code != 200:
|
|
|
logger.debug(
|
|
|
f"Tidal albums batch returned {resp.status_code}: {resp.text[:200]}"
|
|
|
)
|
|
|
return []
|
|
|
|
|
|
data = resp.json()
|
|
|
artists_by_id, artworks_by_id = self._build_included_maps(data.get('included', []))
|
|
|
|
|
|
results: List[Dict[str, Any]] = []
|
|
|
for item in data.get('data', []):
|
|
|
if item.get('type') != 'albums':
|
|
|
continue
|
|
|
attrs = item.get('attributes', {})
|
|
|
rels = item.get('relationships', {})
|
|
|
results.append({
|
|
|
'tidal_id': str(item.get('id', '')),
|
|
|
'album_name': attrs.get('title', '') or '',
|
|
|
'artist_name': self._first_artist_name(rels, artists_by_id),
|
|
|
'image_url': self._first_artwork_url(rels.get('coverArt', {}), artworks_by_id),
|
|
|
'release_date': attrs.get('releaseDate', '') or '',
|
|
|
'total_tracks': int(attrs.get('numberOfItems') or 0),
|
|
|
})
|
|
|
return results
|
|
|
except Exception as e:
|
|
|
logger.debug(f"Tidal _get_albums_batch error: {e}")
|
|
|
return []
|
|
|
|
|
|
@rate_limited
|
|
|
def _get_artists_batch(self, artist_ids: List[str]) -> List[Dict[str, Any]]:
|
|
|
"""Batch-fetch artist metadata + profile image. Returns list
|
|
|
of dicts matching the prior `get_favorite_artists` shape
|
|
|
(`tidal_id`, `name`, `image_url`)."""
|
|
|
if not artist_ids:
|
|
|
return []
|
|
|
try:
|
|
|
params = {
|
|
|
'countryCode': 'US',
|
|
|
'include': 'profileArt',
|
|
|
'filter[id]': ','.join(artist_ids),
|
|
|
}
|
|
|
headers = {'accept': 'application/vnd.api+json'}
|
|
|
resp = self.session.get(
|
|
|
f"{self.base_url}/artists",
|
|
|
params=params, headers=headers, timeout=15,
|
|
|
)
|
|
|
if resp.status_code != 200:
|
|
|
logger.debug(
|
|
|
f"Tidal artists batch returned {resp.status_code}: {resp.text[:200]}"
|
|
|
)
|
|
|
return []
|
|
|
|
|
|
data = resp.json()
|
|
|
_, artworks_by_id = self._build_included_maps(data.get('included', []))
|
|
|
|
|
|
results: List[Dict[str, Any]] = []
|
|
|
for item in data.get('data', []):
|
|
|
if item.get('type') != 'artists':
|
|
|
continue
|
|
|
attrs = item.get('attributes', {})
|
|
|
rels = item.get('relationships', {})
|
|
|
results.append({
|
|
|
'tidal_id': str(item.get('id', '')),
|
|
|
'name': attrs.get('name', '') or '',
|
|
|
'image_url': self._first_artwork_url(rels.get('profileArt', {}), artworks_by_id),
|
|
|
})
|
|
|
return results
|
|
|
except Exception as e:
|
|
|
logger.debug(f"Tidal _get_artists_batch error: {e}")
|
|
|
return []
|
|
|
|
|
|
@staticmethod
|
|
|
def _build_included_maps(included: List[Dict[str, Any]]):
|
|
|
"""Index a JSON:API `included[]` array by resource type so the
|
|
|
per-resource lookup in batch-hydrate is O(1) per relationship
|
|
|
ref rather than O(n)."""
|
|
|
artists_by_id: Dict[str, Dict[str, Any]] = {}
|
|
|
artworks_by_id: Dict[str, Dict[str, Any]] = {}
|
|
|
for inc in included:
|
|
|
inc_id = str(inc.get('id', ''))
|
|
|
if not inc_id:
|
|
|
continue
|
|
|
inc_type = inc.get('type')
|
|
|
if inc_type == 'artists':
|
|
|
artists_by_id[inc_id] = inc
|
|
|
elif inc_type == 'artworks':
|
|
|
artworks_by_id[inc_id] = inc
|
|
|
return artists_by_id, artworks_by_id
|
|
|
|
|
|
@staticmethod
|
|
|
def _first_artist_name(relationships: Dict[str, Any],
|
|
|
artists_by_id: Dict[str, Dict[str, Any]]) -> str:
|
|
|
"""Resolve the primary artist name from a relationships block
|
|
|
+ included-artists map. Returns '' if not resolvable so the
|
|
|
upsert path doesn't trip on None."""
|
|
|
artist_refs = relationships.get('artists', {}).get('data', [])
|
|
|
if not artist_refs:
|
|
|
return ''
|
|
|
first_id = str(artist_refs[0].get('id', ''))
|
|
|
artist_obj = artists_by_id.get(first_id, {})
|
|
|
return artist_obj.get('attributes', {}).get('name', '') or ''
|
|
|
|
|
|
@staticmethod
|
|
|
def _first_artwork_url(artwork_relationship: Dict[str, Any],
|
|
|
artworks_by_id: Dict[str, Dict[str, Any]]) -> Optional[str]:
|
|
|
"""Resolve the largest cover/profile image URL from an artwork
|
|
|
relationship + included-artworks map. Tidal returns files
|
|
|
largest-first so picking files[0] gets the highest-resolution
|
|
|
variant (typically 1280×1280)."""
|
|
|
refs = artwork_relationship.get('data', [])
|
|
|
if not refs:
|
|
|
return None
|
|
|
first_id = str(refs[0].get('id', ''))
|
|
|
artwork = artworks_by_id.get(first_id, {})
|
|
|
files = artwork.get('attributes', {}).get('files', [])
|
|
|
if not files:
|
|
|
return None
|
|
|
return files[0].get('href')
|
|
|
|
|
|
def get_favorite_albums(self, limit: int = 200) -> List[Dict[str, Any]]:
|
|
|
"""Fetch user's favorited albums via the V2 user-collection
|
|
|
endpoint. Replaces the prior `/v2/favorites` + V1-fallback
|
|
|
path which is now dead (V2 endpoint deprecated, V1 returns
|
|
|
403 for modern OAuth tokens lacking `r_usr` scope).
|
|
|
|
|
|
Returns list of dicts matching `database.upsert_liked_album`
|
|
|
kwargs — the discover.py 'Your Albums' aggregator iterates
|
|
|
these and writes them to the liked_albums table."""
|
|
|
try:
|
|
|
album_ids = self._iter_collection_album_ids(max_ids=limit)
|
|
|
if not album_ids:
|
|
|
return []
|
|
|
|
|
|
results: List[Dict[str, Any]] = []
|
|
|
for i in range(0, len(album_ids), self._COLLECTION_BATCH_SIZE):
|
|
|
batch = album_ids[i:i + self._COLLECTION_BATCH_SIZE]
|
|
|
results.extend(self._get_albums_batch(batch))
|
|
|
|
|
|
logger.info(
|
|
|
f"Retrieved {len(results)}/{len(album_ids)} favorite albums from Tidal"
|
|
|
)
|
|
|
return results
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error fetching Tidal favorite albums: {e}")
|
|
|
return []
|
|
|
|
|
|
def get_favorite_artists(self, limit: int = 200) -> List[Dict[str, Any]]:
|
|
|
"""Fetch user's favorited artists via the V2 user-collection
|
|
|
endpoint. Replaces the prior `/v2/favorites` + V1-fallback
|
|
|
path (dead for the same reason as `get_favorite_albums`).
|
|
|
|
|
|
Returns list of dicts matching the prior shape (`tidal_id`,
|
|
|
`name`, `image_url`) so web_server.py's `/api/discover/
|
|
|
your-artists-fetch` aggregator path stays byte-identical."""
|
|
|
try:
|
|
|
artist_ids = self._iter_collection_artist_ids(max_ids=limit)
|
|
|
if not artist_ids:
|
|
|
return []
|
|
|
|
|
|
results: List[Dict[str, Any]] = []
|
|
|
for i in range(0, len(artist_ids), self._COLLECTION_BATCH_SIZE):
|
|
|
batch = artist_ids[i:i + self._COLLECTION_BATCH_SIZE]
|
|
|
results.extend(self._get_artists_batch(batch))
|
|
|
|
|
|
logger.info(
|
|
|
f"Retrieved {len(results)}/{len(artist_ids)} favorite artists from Tidal"
|
|
|
)
|
|
|
return results
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error fetching Tidal favorite artists: {e}")
|
|
|
return []
|
|
|
|
|
|
# Global instance
|
|
|
_tidal_client = None
|
|
|
|
|
|
def get_tidal_client() -> TidalClient:
|
|
|
"""Get global Tidal client instance"""
|
|
|
global _tidal_client
|
|
|
if _tidal_client is None:
|
|
|
_tidal_client = TidalClient()
|
|
|
return _tidal_client |