You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
SoulSync/core/discogs_client.py

756 lines
29 KiB

"""
Discogs API client for music metadata enrichment.
Follows the same pattern as iTunesClient/DeezerClient — returns data
via the shared Track/Artist/Album dataclasses so all sources are interchangeable.
Rate limits: 25 req/min unauthenticated, 60 req/min with personal token.
API docs: https://www.discogs.com/developers
"""
import re
import time
import threading
import requests
from core.metadata_cache import get_metadata_cache
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from functools import wraps
from utils.logging_config import get_logger
logger = get_logger("discogs_client")
# Global rate limiting
_last_api_call_time = 0
_api_call_lock = threading.Lock()
MIN_API_INTERVAL = 2.5 # 25 req/min unauth = 1 call per 2.4s, padded to 2.5s
MIN_API_INTERVAL_AUTH = 1.0 # 60 req/min auth = 1 call per 1.0s
_is_authenticated = False
def rate_limited(func):
"""Decorator to enforce rate limiting on Discogs API calls."""
@wraps(func)
def wrapper(*args, **kwargs):
global _last_api_call_time
interval = MIN_API_INTERVAL_AUTH if _is_authenticated else MIN_API_INTERVAL
with _api_call_lock:
current_time = time.time()
time_since_last_call = current_time - _last_api_call_time
if time_since_last_call < interval:
sleep_time = interval - time_since_last_call
time.sleep(sleep_time)
_last_api_call_time = time.time()
from core.api_call_tracker import api_call_tracker
api_call_tracker.record_call('discogs')
try:
result = func(*args, **kwargs)
return result
except Exception as e:
if "429" in str(e):
logger.warning(f"Discogs rate limit hit, backing off: {e}")
time.sleep(30)
raise e
return wrapper
# --- Shared dataclasses (same shape as iTunes/Deezer/Spotify) ---
@dataclass
class Track:
id: str
name: str
artists: List[str]
album: str
duration_ms: int
popularity: int
preview_url: Optional[str] = None
external_urls: Optional[Dict[str, str]] = None
image_url: Optional[str] = None
release_date: Optional[str] = None
track_number: Optional[int] = None
disc_number: Optional[int] = None
album_type: Optional[str] = None
total_tracks: Optional[int] = None
@classmethod
def from_discogs_track(cls, track_data: Dict[str, Any], release_data: Dict[str, Any] = None) -> 'Track':
"""Build Track from Discogs release tracklist entry + parent release."""
release = release_data or {}
# Parse position (e.g., "A1", "B2", "1", "2-3")
position = track_data.get('position', '')
track_number = None
disc_number = 1
if position:
# Handle "1-3" (disc-track) format
if '-' in position and position.replace('-', '').isdigit():
parts = position.split('-')
disc_number = int(parts[0])
track_number = int(parts[1])
elif position.isdigit():
track_number = int(position)
else:
# Vinyl side notation: A1 → disc 1 track 1, B2 → disc 1 track 6, etc.
try:
track_number = int(''.join(c for c in position if c.isdigit()) or '0') or None
except ValueError:
pass
# Duration string "5:23" → ms
duration_ms = 0
dur_str = track_data.get('duration', '')
if dur_str and ':' in dur_str:
parts = dur_str.split(':')
try:
duration_ms = (int(parts[0]) * 60 + int(parts[1])) * 1000
except (ValueError, IndexError):
pass
# Artists from track-level or release-level
track_artists = []
if track_data.get('artists'):
track_artists = [a.get('name', '') for a in track_data['artists'] if a.get('name')]
if not track_artists and release.get('artists'):
track_artists = [a.get('name', '') for a in release['artists'] if a.get('name')]
if not track_artists:
track_artists = ['Unknown Artist']
# Image from release
image_url = None
images = release.get('images', [])
if images:
# Prefer 'primary' type, fall back to first
primary = next((img for img in images if img.get('type') == 'primary'), None)
image_url = (primary or images[0]).get('uri')
# Album type
total_tracks = len(release.get('tracklist', []))
formats = release.get('formats', [{}])
format_name = formats[0].get('name', '') if formats else ''
external_urls = {}
if release.get('uri'):
external_urls['discogs'] = f"https://www.discogs.com{release['uri']}" if release['uri'].startswith('/') else release['uri']
return cls(
id=str(release.get('id', '')) + f'_t{track_number or 0}',
name=track_data.get('title', ''),
artists=track_artists,
album=release.get('title', ''),
duration_ms=duration_ms,
popularity=release.get('community', {}).get('have', 0),
external_urls=external_urls if external_urls else None,
image_url=image_url,
release_date=str(release.get('year', '')) if release.get('year') else None,
track_number=track_number,
disc_number=disc_number,
album_type='album',
total_tracks=total_tracks,
)
@dataclass
class Artist:
id: str
name: str
popularity: int
genres: List[str]
followers: int
image_url: Optional[str] = None
external_urls: Optional[Dict[str, str]] = None
@classmethod
def from_discogs_artist(cls, artist_data: Dict[str, Any]) -> 'Artist':
# Images — prefer primary
image_url = None
images = artist_data.get('images', [])
if images:
primary = next((img for img in images if img.get('type') == 'primary'), None)
image_url = (primary or images[0]).get('uri')
# Search results use 'thumb' or 'cover_image'
if not image_url:
image_url = artist_data.get('cover_image') or artist_data.get('thumb')
if image_url and 'spacer.gif' in image_url:
image_url = None
external_urls = {}
if artist_data.get('uri'):
uri = artist_data['uri']
external_urls['discogs'] = f"https://www.discogs.com{uri}" if uri.startswith('/') else uri
elif artist_data.get('resource_url'):
external_urls['discogs_api'] = artist_data['resource_url']
return cls(
id=str(artist_data.get('id', '')),
name=artist_data.get('name', artist_data.get('title', '')),
popularity=0,
genres=[],
followers=0,
image_url=image_url,
external_urls=external_urls if external_urls else None,
)
@dataclass
class Album:
id: str
name: str
artists: List[str]
release_date: str
total_tracks: int
album_type: str
image_url: Optional[str] = None
external_urls: Optional[Dict[str, str]] = None
@classmethod
def from_discogs_release(cls, release_data: Dict[str, Any]) -> 'Album':
# Artists — search results put "Artist - Title" in the title field
artists = []
title = release_data.get('title', '')
if release_data.get('artists'):
artists = [a.get('name', '') for a in release_data['artists'] if a.get('name')]
elif release_data.get('artist'):
artists = [release_data['artist']]
elif ' - ' in title:
# Search results: "Radiohead - OK Computer" → artist="Radiohead", title="OK Computer"
parts = title.split(' - ', 1)
artists = [parts[0].strip()]
title = parts[1].strip()
if not artists:
artists = ['Unknown Artist']
# Image
image_url = None
images = release_data.get('images', [])
if images:
primary = next((img for img in images if img.get('type') == 'primary'), None)
image_url = (primary or images[0]).get('uri')
if not image_url:
image_url = release_data.get('cover_image') or release_data.get('thumb')
if image_url and 'spacer.gif' in image_url:
image_url = None
# Track count
tracklist = release_data.get('tracklist', [])
total_tracks = len(tracklist) if tracklist else (release_data.get('format_quantity', 0) or 0)
# Album type from formats array (full release detail) or format string (search/artist releases)
formats = release_data.get('formats', [])
format_name = formats[0].get('name', '').lower() if formats else ''
descriptions = [d.lower() for d in formats[0].get('descriptions', [])] if formats else []
# Also check the 'format' field from search/artist release endpoints
# Can be a string "Vinyl, LP, Album" or a list ["Vinyl", "LP", "Album"]
raw_format = release_data.get('format') or ''
if isinstance(raw_format, list):
format_str = ', '.join(raw_format).lower()
else:
format_str = str(raw_format).lower()
if 'single' in descriptions or 'single' in format_name or 'single' in format_str:
album_type = 'single'
elif 'ep' in descriptions or ', ep' in format_str or format_str.endswith('ep'):
album_type = 'ep'
elif 'compilation' in descriptions or 'compilation' in format_str or 'compilation' in (release_data.get('type', '') or '').lower():
album_type = 'compilation'
elif 'lp' in descriptions or 'lp' in format_str or 'album' in descriptions or 'album' in format_str:
album_type = 'album'
elif total_tracks <= 3 and total_tracks > 0:
album_type = 'single'
elif total_tracks <= 6 and total_tracks > 0:
album_type = 'ep'
else:
album_type = 'album'
# Year
year = release_data.get('year', '')
release_date = str(year) if year and year != 0 else ''
external_urls = {}
if release_data.get('uri'):
uri = release_data['uri']
external_urls['discogs'] = f"https://www.discogs.com{uri}" if uri.startswith('/') else uri
elif release_data.get('resource_url'):
external_urls['discogs_api'] = release_data['resource_url']
return cls(
id=str(release_data.get('id', '')),
name=title,
artists=artists,
release_date=release_date,
total_tracks=total_tracks,
album_type=album_type,
image_url=image_url,
external_urls=external_urls if external_urls else None,
)
class DiscogsClient:
"""
Discogs API client for music metadata.
Full parity with iTunesClient/DeezerClient — same method signatures,
same return types (Track, Artist, Album dataclasses).
Rate limit: 25 req/min unauthenticated, 60 req/min with personal token.
"""
BASE_URL = "https://api.discogs.com"
def __init__(self, token: str = None):
global _is_authenticated
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'SoulSync/2.2 +https://github.com/Nezreka/SoulSync',
'Accept': 'application/json',
})
# Load token from config or parameter
self.token = token
if not self.token:
try:
from config.settings import config_manager
self.token = config_manager.get('discogs.token', '')
except Exception:
pass
if self.token:
self.session.headers['Authorization'] = f'Discogs token={self.token}'
_is_authenticated = True
logger.info("Discogs client initialized (authenticated — 60 req/min)")
else:
_is_authenticated = False
logger.info("Discogs client initialized (unauthenticated — 25 req/min)")
def is_authenticated(self) -> bool:
return bool(self.token)
def is_configured(self) -> bool:
return True # Works without auth
@staticmethod
def _normalize_name(name: str) -> str:
"""Normalize a name for comparison — lowercase, strip parentheticals and punctuation."""
name = name.lower().strip()
name = re.sub(r'\s*\(.*?\)\s*', ' ', name)
name = re.sub(r'[^\w\s]', '', name)
name = re.sub(r'\s+', ' ', name).strip()
return name
# --- Core API Methods ---
@rate_limited
def _api_get(self, endpoint: str, params: Dict = None) -> Optional[Dict]:
"""Make a GET request to the Discogs API."""
try:
url = f"{self.BASE_URL}{endpoint}" if endpoint.startswith('/') else endpoint
resp = self.session.get(url, params=params, timeout=15)
if resp.status_code == 429:
logger.warning("Discogs rate limit hit")
time.sleep(30)
return None
if resp.status_code != 200:
logger.debug(f"Discogs API {endpoint} returned {resp.status_code}")
return None
return resp.json()
except Exception as e:
logger.error(f"Discogs API error ({endpoint}): {e}")
return None
# --- Search Methods (same signatures as iTunes/Deezer) ---
def search_artists(self, query: str, limit: int = 10) -> List[Artist]:
"""Search for artists on Discogs."""
cache = get_metadata_cache()
cached_results = cache.get_search_results('discogs', 'artist', query, limit)
if cached_results is not None:
artists = []
for raw in cached_results:
try:
artists.append(Artist.from_discogs_artist(raw))
except Exception:
pass
if artists:
return artists
data = self._api_get('/database/search', {
'q': query, 'type': 'artist', 'per_page': min(limit, 50),
})
if not data or not data.get('results'):
return []
artists = []
raw_items = []
for item in data['results'][:limit]:
try:
artists.append(Artist.from_discogs_artist(item))
raw_items.append(item)
except Exception as e:
logger.debug(f"Error parsing Discogs artist: {e}")
if raw_items:
entries = [(str(r.get('id', '')), r) for r in raw_items if r.get('id')]
if entries:
cache.store_entities_bulk('discogs', 'artist', entries)
cache.store_search_results('discogs', 'artist', query, limit,
[str(r.get('id', '')) for r in raw_items if r.get('id')])
return artists
def search_albums(self, query: str, limit: int = 10) -> List[Album]:
"""Search for releases/albums on Discogs."""
cache = get_metadata_cache()
cached_results = cache.get_search_results('discogs', 'album', query, limit)
if cached_results is not None:
albums = []
for raw in cached_results:
try:
albums.append(Album.from_discogs_release(raw))
except Exception:
pass
if albums:
return albums
data = self._api_get('/database/search', {
'q': query, 'type': 'release', 'per_page': min(limit, 50),
})
if not data or not data.get('results'):
return []
albums = []
raw_items = []
seen_titles = set()
for item in data['results'][:limit * 2]:
try:
album = Album.from_discogs_release(item)
dedup_key = f"{album.name.lower()}|{album.artists[0].lower() if album.artists else ''}"
if dedup_key in seen_titles:
continue
seen_titles.add(dedup_key)
albums.append(album)
raw_items.append(item)
if len(albums) >= limit:
break
except Exception as e:
logger.debug(f"Error parsing Discogs release: {e}")
if raw_items:
entries = [(str(r.get('id', '')), r) for r in raw_items if r.get('id')]
if entries:
cache.store_entities_bulk('discogs', 'album', entries, skip_if_exists=True)
cache.store_search_results('discogs', 'album', query, limit,
[str(r.get('id', '')) for r in raw_items if r.get('id')])
return albums
def search_tracks(self, query: str, limit: int = 10) -> List[Track]:
"""Search for tracks on Discogs.
Discogs doesn't have a track-level search API — returns empty list.
Track data is available via get_album() tracklists instead."""
# Discogs has no track search endpoint. Artists and albums are the
# searchable entities. Individual tracks come from release tracklists.
return []
# --- Lookup Methods ---
def get_artist(self, artist_id: str) -> Optional[Dict[str, Any]]:
"""Get artist details by Discogs ID."""
cache = get_metadata_cache()
cached = cache.get_entity('discogs', 'artist', artist_id)
if cached and cached.get('name'):
# Rebuild normalized result from cached raw data
data = cached
else:
data = self._api_get(f'/artists/{artist_id}')
if not data:
return None
cache.store_entity('discogs', 'artist', artist_id, data)
artist = Artist.from_discogs_artist(data)
# Get profile/bio
profile = data.get('profile', '')
result = {
'id': artist.id,
'name': artist.name,
'image_url': artist.image_url,
'genres': [],
'popularity': 0,
'followers': 0,
'bio': profile,
'external_urls': artist.external_urls,
'images': [{'url': artist.image_url}] if artist.image_url else [],
}
return result
def get_album(self, release_id: str, include_tracks: bool = True) -> Optional[Dict[str, Any]]:
"""Get release/album details by Discogs ID. Tries master first, falls back to release."""
cache = get_metadata_cache()
cached = cache.get_entity('discogs', 'album', release_id)
if cached and cached.get('title'):
data = cached
else:
# Try as master first (artist discography returns master IDs)
data = self._api_get(f'/masters/{release_id}')
if not data or not data.get('title'):
data = self._api_get(f'/releases/{release_id}')
if not data:
return None
cache.store_entity('discogs', 'album', release_id, data)
album = Album.from_discogs_release(data)
result = {
'id': album.id,
'name': album.name,
'artist': album.artists[0] if album.artists else '',
'artists': album.artists,
'release_date': album.release_date,
'total_tracks': album.total_tracks,
'album_type': album.album_type,
'image_url': album.image_url,
'images': [{'url': album.image_url}] if album.image_url else [],
'external_urls': album.external_urls,
'genres': data.get('genres', []),
'styles': data.get('styles', []),
'label': data.get('labels', [{}])[0].get('name', '') if data.get('labels') else '',
'catalog_number': data.get('labels', [{}])[0].get('catno', '') if data.get('labels') else '',
}
if include_tracks and data.get('tracklist'):
result['tracks'] = {
'items': [self._tracklist_to_spotify_format(t, data) for t in data['tracklist']
if t.get('type_', '') == 'track' or not t.get('type_')]
}
return result
def get_artist_albums(self, artist_id: str, album_type: str = 'album,single', limit: int = 50) -> List[Album]:
"""Get releases by an artist. Prefers master releases, filters features."""
# First get the artist name for feature filtering
artist_data = self._api_get(f'/artists/{artist_id}')
artist_name = artist_data.get('name', '').lower() if artist_data else ''
data = self._api_get(f'/artists/{artist_id}/releases', {
'sort': 'year', 'sort_order': 'desc', 'per_page': min(limit * 3, 200),
})
if not data or not data.get('releases'):
return []
# Separate masters from individual releases — prefer masters (canonical versions)
masters = []
releases_no_master = []
master_titles = set()
for item in data['releases']:
# Skip non-main roles
role = item.get('role', 'Main').lower()
if role not in ('main', ''):
continue
# Filter out features — only include releases where this artist is the PRIMARY artist
# "Beyoncé Feat. Kendrick Lamar" → primary is Beyoncé, skip
# "Kendrick Lamar Feat. Rihanna" → primary is Kendrick, keep
release_artist = item.get('artist', '')
if artist_name and release_artist:
# Get the primary artist (before any Feat./Ft./&)
primary = re.split(r'\s+(?:feat\.?|ft\.?|featuring)\s+', release_artist, flags=re.IGNORECASE)[0]
primary = re.split(r'\s*[&,]\s*', primary)[0].strip()
# Check if our artist is the primary
if self._normalize_name(primary) != self._normalize_name(artist_name):
continue
if item.get('type') == 'master':
masters.append(item)
master_titles.add(item.get('title', '').lower())
else:
releases_no_master.append(item)
# Use masters first, then add releases that don't have a master
ordered = masters + [r for r in releases_no_master if r.get('title', '').lower() not in master_titles]
albums = []
seen_titles = set()
allowed_types = set(album_type.split(','))
for item in ordered:
try:
album = Album.from_discogs_release(item)
# Use thumb from release list as image
thumb = item.get('thumb') or item.get('cover_image') or ''
if thumb and 'spacer.gif' not in thumb and not album.image_url:
album = Album(id=album.id, name=album.name, artists=album.artists,
release_date=album.release_date, total_tracks=album.total_tracks,
album_type=album.album_type, image_url=thumb,
external_urls=album.external_urls)
# Deduplicate by normalized title (but keep deluxe/special editions as separate)
dedup_key = album.name.lower().strip()
if dedup_key in seen_titles:
continue
seen_titles.add(dedup_key)
# Filter by requested type
if album.album_type in allowed_types:
albums.append(album)
if len(albums) >= limit:
break
except Exception as e:
logger.debug(f"Error parsing Discogs artist release: {e}")
return albums
def get_album_tracks(self, release_id: str) -> Optional[Dict[str, Any]]:
"""Get album tracks by Discogs release or master ID. Returns Spotify-compatible format."""
cache = get_metadata_cache()
cache_key = f"{release_id}_tracks"
cached = cache.get_entity('discogs', 'album', cache_key)
if cached:
return cached
# Try as master first (master IDs are used in artist discography)
data = self._api_get(f'/masters/{release_id}')
if not data or not data.get('tracklist'):
data = self._api_get(f'/releases/{release_id}')
if not data or not data.get('tracklist'):
return None
# Get album image
image_url = None
images = data.get('images', [])
if images:
primary = next((img for img in images if img.get('type') == 'primary'), None)
image_url = (primary or images[0]).get('uri')
album_info = {
'id': str(data.get('id', release_id)),
'name': data.get('title', ''),
'images': [{'url': image_url, 'height': 600, 'width': 600}] if image_url else [],
'release_date': str(data.get('year', '')) if data.get('year') else '',
}
# Get artists
artists_list = []
if data.get('artists'):
artists_list = [{'name': a.get('name', '')} for a in data['artists'] if a.get('name')]
if not artists_list:
artists_list = [{'name': 'Unknown Artist'}]
tracks = []
track_num = 0
disc_num = 1
for t in data['tracklist']:
if t.get('type_') == 'heading':
disc_num += 1
continue
if t.get('type_', '') not in ('track', ''):
continue
track_num += 1
# Parse duration
duration_ms = 0
dur_str = t.get('duration', '')
if dur_str and ':' in dur_str:
parts = dur_str.split(':')
try:
duration_ms = (int(parts[0]) * 60 + int(parts[1])) * 1000
except (ValueError, IndexError):
pass
# Per-track artists or fall back to release artists
track_artists = artists_list
if t.get('artists'):
track_artists = [{'name': a.get('name', '')} for a in t['artists'] if a.get('name')]
tracks.append({
'id': f"{release_id}_t{track_num}",
'name': t.get('title', ''),
'artists': track_artists,
'album': album_info,
'duration_ms': duration_ms,
'track_number': track_num,
'disc_number': disc_num if disc_num > 1 else 1,
'explicit': False,
'uri': f"discogs:track:{release_id}_{track_num}",
'external_urls': {},
'_source': 'discogs',
})
result = {
'items': tracks,
'total': len(tracks),
'limit': len(tracks),
'next': None,
}
cache.store_entity('discogs', 'album', cache_key, result)
return result
def _fetch_and_cache_artist(self, artist_id: str) -> Optional[Dict]:
"""Fetch raw artist data with cache. Used by enrichment worker."""
cache = get_metadata_cache()
cached = cache.get_entity('discogs', 'artist', str(artist_id))
if cached and cached.get('name'):
return cached
data = self._api_get(f'/artists/{artist_id}')
if data:
cache.store_entity('discogs', 'artist', str(artist_id), data)
return data
def _fetch_and_cache_album(self, release_id: str) -> Optional[Dict]:
"""Fetch raw album/release data with cache. Used by enrichment worker."""
cache = get_metadata_cache()
cached = cache.get_entity('discogs', 'album', str(release_id))
if cached and cached.get('title'):
return cached
data = self._api_get(f'/masters/{release_id}')
if not data or not data.get('title'):
data = self._api_get(f'/releases/{release_id}')
if data:
cache.store_entity('discogs', 'album', str(release_id), data)
return data
def _get_artist_image_from_albums(self, artist_id: str) -> Optional[str]:
"""Get artist image by fetching their first album's cover art.
Used as fallback when artist has no direct image."""
data = self._api_get(f'/artists/{artist_id}/releases', {
'sort': 'year', 'sort_order': 'desc', 'per_page': 5,
})
if not data or not data.get('releases'):
return None
for release in data['releases']:
thumb = release.get('thumb')
if thumb and 'spacer.gif' not in thumb:
return thumb
return None
# --- Helpers ---
def _tracklist_to_spotify_format(self, track_data: Dict, release_data: Dict) -> Dict:
"""Convert a Discogs tracklist entry to Spotify-compatible track dict."""
t = Track.from_discogs_track(track_data, release_data)
return {
'id': t.id,
'name': t.name,
'artists': [{'name': a} for a in t.artists],
'track_number': t.track_number,
'disc_number': t.disc_number,
'duration_ms': t.duration_ms,
}