Refactor album track lookup to use source priority

- Move album-track resolution into metadata_service
- Use the configured provider order instead of Spotify-first branching
- Switch the frontend to the unified /api/album/<id>/tracks endpoint
- Add tests for source-priority lookup, DB resolution, and formatting
pull/339/head
Antti Kettunen 4 weeks ago
parent 71bff55c6a
commit 24abae6908
No known key found for this signature in database
GPG Key ID: C6B2A3D250359BD7

@ -148,6 +148,24 @@ def get_album_tracks_for_source(source: str, album_id: str):
return None
def get_album_for_source(source: str, album_id: str):
"""Get album metadata for an exact source.
Returns a provider-normalized album dict or None.
No fallback swaps.
"""
client = get_client_for_source(source)
if not client or not hasattr(client, 'get_album'):
return None
try:
if source == 'spotify':
return client.get_album(album_id, allow_fallback=False)
return client.get_album(album_id)
except Exception:
return None
def get_artist_albums_for_source(
source: str,
artist_id: str,
@ -259,6 +277,20 @@ def _search_artists_for_source(source: str, client: Any, artist_name: str, limit
return []
def _search_albums_for_source(source: str, client: Any, query: str, limit: int = 5) -> List[Any]:
if not client or not hasattr(client, 'search_albums'):
return []
try:
kwargs = {'limit': limit}
if source == 'spotify':
kwargs['allow_fallback'] = False
return client.search_albums(query, **kwargs) or []
except Exception as exc:
logger.debug("Could not search %s for %s: %s", source, query, exc)
return []
def _pick_best_artist_match(search_results: List[Any], artist_name: str) -> Optional[Any]:
"""Prefer an exact artist-name match, otherwise use the first result."""
if not search_results:
@ -641,6 +673,317 @@ def _extract_track_items(api_tracks: Any) -> List[Dict[str, Any]]:
return []
def _normalize_track_artists(track_item: Any) -> List[str]:
artists = _extract_lookup_value(track_item, 'artists', default=[]) or []
if isinstance(artists, (str, bytes)):
artists = [artists]
elif isinstance(artists, dict):
artists = [artists]
else:
try:
artists = list(artists)
except TypeError:
artists = [artists]
normalized = []
for artist in artists:
artist_name = _extract_lookup_value(artist, 'name', 'artist_name', 'title')
if not artist_name and isinstance(artist, str):
artist_name = artist
if artist_name:
normalized.append(str(artist_name))
return normalized
def _extract_album_track_items(album_data: Any, tracks_data: Any = None) -> List[Dict[str, Any]]:
embedded_tracks = _extract_lookup_value(album_data, 'tracks', default=None)
if isinstance(embedded_tracks, dict):
items = embedded_tracks.get('items') or []
if items:
return items
elif isinstance(embedded_tracks, list):
if embedded_tracks:
return embedded_tracks
return _extract_track_items(tracks_data)
def _build_album_info(album_data: Any, album_id: str, album_name: str = '', artist_name: str = '') -> Dict[str, Any]:
images = _extract_lookup_value(album_data, 'images', default=[]) or []
if not isinstance(images, list):
images = list(images) if images else []
image_url = None
if images:
image_url = _extract_lookup_value(images[0], 'url')
if not image_url:
image_url = _extract_lookup_value(album_data, 'image_url', 'thumb_url')
return {
'id': _extract_lookup_value(album_data, 'id', 'album_id', 'collectionId', 'release_id', default=album_id) or album_id,
'name': _extract_lookup_value(album_data, 'name', 'title', default=album_name or album_id) or album_name or album_id,
'image_url': image_url,
'images': images,
'release_date': _extract_lookup_value(album_data, 'release_date', default='') or '',
'album_type': _extract_lookup_value(album_data, 'album_type', default='album') or 'album',
'total_tracks': _extract_lookup_value(album_data, 'total_tracks', 'track_count', default=0) or 0,
'artist_name': artist_name or _extract_lookup_value(album_data, 'artist_name', default='') or '',
}
def _build_album_track_entry(track_item: Any, album_info: Dict[str, Any], source: str) -> Dict[str, Any]:
explicit_value = _extract_lookup_value(track_item, 'explicit', 'trackExplicitness', default=False)
if isinstance(explicit_value, str):
explicit_value = explicit_value.lower() == 'explicit'
return {
'id': _extract_lookup_value(track_item, 'id', 'track_id', 'trackId', default='') or '',
'name': _extract_lookup_value(track_item, 'name', 'track_name', 'trackName', default='Unknown Track') or 'Unknown Track',
'artists': _normalize_track_artists(track_item),
'duration_ms': _extract_lookup_value(track_item, 'duration_ms', 'trackTimeMillis', default=0) or 0,
'track_number': _extract_lookup_value(track_item, 'track_number', 'trackNumber', default=0) or 0,
'disc_number': _extract_lookup_value(track_item, 'disc_number', 'discNumber', default=1) or 1,
'explicit': bool(explicit_value),
'preview_url': _extract_lookup_value(track_item, 'preview_url', 'previewUrl'),
'external_urls': _extract_lookup_value(track_item, 'external_urls', default={}) or {},
'uri': _extract_lookup_value(track_item, 'uri', default='') or '',
'album': album_info,
'_source': source,
}
def _build_album_tracks_payload(
album_data: Any,
tracks_data: Any,
source: str,
album_id: str,
album_name: str = '',
artist_name: str = '',
) -> Dict[str, Any]:
album_info = _build_album_info(album_data, album_id, album_name=album_name, artist_name=artist_name)
track_items = _extract_album_track_items(album_data, tracks_data)
tracks = [_build_album_track_entry(track, album_info, source) for track in track_items]
return {
'success': bool(tracks),
'album': album_info,
'tracks': tracks,
'source': source,
}
def resolve_album_reference(
album_id: str,
preferred_source: Optional[str] = None,
album_name: str = '',
artist_name: str = '',
) -> tuple[Optional[str], Optional[str]]:
"""Resolve a local database album ID or name-based reference to a provider ID."""
try:
from database.music_database import get_database
database = get_database()
with database._get_connection() as conn:
cursor = conn.cursor()
cursor.execute("PRAGMA table_info(albums)")
album_columns = {row[1] for row in cursor.fetchall()}
source_chain = list(get_source_priority(preferred_source or get_primary_source()))
override = (preferred_source or '').strip().lower()
if override:
source_chain = [override] + [source for source in source_chain if source != override]
source_columns = {
'spotify': ('spotify_album_id',),
'deezer': ('deezer_id', 'deezer_album_id'),
'itunes': ('itunes_album_id',),
'discogs': ('discogs_id',),
'hydrabase': ('soul_id', 'hydrabase_album_id'),
}
select_columns = ["a.title", "ar.name as artist_name"]
for columns in source_columns.values():
for column in columns:
if column in album_columns:
select_columns.append(f"a.{column}")
cursor.execute(
"""
SELECT {select_columns}
FROM albums a
JOIN artists ar ON a.artist_id = ar.id
WHERE a.id = ?
""".format(select_columns=", ".join(select_columns)),
(album_id,),
)
row = cursor.fetchone()
if row:
for source in source_chain:
for column in source_columns.get(source, ()):
if column not in row.keys():
continue
value = row[column]
if value:
return value, source
search_title = album_name or row['title']
search_artist = artist_name or row['artist_name']
query = f"{search_artist} {search_title}".strip()
for source in source_chain:
client = get_client_for_source(source)
if not client:
continue
results = _search_albums_for_source(source, client, query, limit=5)
if results:
for album in results:
candidate_name = str(_extract_lookup_value(album, 'name', 'title', default='') or '').strip().lower()
if candidate_name and candidate_name == str(search_title).strip().lower():
return _extract_lookup_value(album, 'id', 'album_id', 'release_id'), source
best = results[0]
return _extract_lookup_value(best, 'id', 'album_id', 'release_id'), source
if not album_name and not artist_name:
return None, None
query = " ".join(part for part in (artist_name, album_name) if part).strip() or album_id
for source in source_chain:
client = get_client_for_source(source)
if not client:
continue
results = _search_albums_for_source(source, client, query, limit=5)
if results:
for album in results:
candidate_name = str(_extract_lookup_value(album, 'name', 'title', default='') or '').strip().lower()
if album_name and candidate_name == album_name.strip().lower():
return _extract_lookup_value(album, 'id', 'album_id', 'release_id'), source
best = results[0]
return _extract_lookup_value(best, 'id', 'album_id', 'release_id'), source
except Exception as e:
logger.debug("Error resolving album reference %s: %s", album_id, e)
return None, None
def get_artist_album_tracks(
album_id: str,
artist_name: str = '',
album_name: str = '',
source_override: Optional[str] = None,
) -> Dict[str, Any]:
"""Get a normalized album-track payload using source-priority lookup."""
source_chain = _get_source_chain_for_lookup(
MetadataLookupOptions(source_override=source_override, allow_fallback=True)
)
preferred_source = source_chain[0] if source_chain else None
for source in source_chain:
client = get_client_for_source(source)
if not client:
continue
album_data = get_album_for_source(source, album_id)
if not album_data:
continue
tracks_data = None
if not _extract_album_track_items(album_data):
tracks_data = get_album_tracks_for_source(source, album_id)
payload = _build_album_tracks_payload(
album_data,
tracks_data,
source,
album_id,
album_name=album_name,
artist_name=artist_name,
)
if payload['tracks']:
payload['success'] = True
payload['source_priority'] = source_chain
payload['resolved_album_id'] = album_id
return payload
resolved_album_id, resolved_source = resolve_album_reference(
album_id,
preferred_source=preferred_source,
album_name=album_name,
artist_name=artist_name,
)
if resolved_album_id:
retry_sources = []
if resolved_source:
retry_sources.append(resolved_source)
retry_sources.extend(source for source in source_chain if source not in retry_sources)
for source in retry_sources:
client = get_client_for_source(source)
if not client:
continue
album_data = get_album_for_source(source, resolved_album_id)
if not album_data:
continue
tracks_data = None
if not _extract_album_track_items(album_data):
tracks_data = get_album_tracks_for_source(source, resolved_album_id)
payload = _build_album_tracks_payload(
album_data,
tracks_data,
source,
resolved_album_id,
album_name=album_name,
artist_name=artist_name,
)
if payload['tracks']:
payload['success'] = True
payload['source_priority'] = source_chain
payload['resolved_album_id'] = resolved_album_id
return payload
# Keep trying the remaining sources in case another provider has the track listing.
continue
if resolved_album_id:
return {
'success': False,
'error': 'No tracks found for album — it may be region-restricted or unavailable on this metadata source',
'status_code': 404,
'source_priority': source_chain,
'resolved_album_id': resolved_album_id,
'tracks': [],
'album': {
'id': resolved_album_id,
'name': album_name or resolved_album_id,
'image_url': None,
'images': [],
'release_date': '',
'album_type': 'album',
'total_tracks': 0,
},
}
return {
'success': False,
'error': 'Album not found',
'status_code': 404,
'source_priority': source_chain,
'resolved_album_id': None,
'tracks': [],
'album': {
'id': album_id,
'name': album_name or album_id,
'image_url': None,
'images': [],
'release_date': '',
'album_type': 'album',
'total_tracks': 0,
},
}
def _resolve_completion_artist_name(
discography: Dict[str, Any],
artist_name: str,

@ -0,0 +1,248 @@
import sqlite3
import sys
import types
import pytest
if "spotipy" not in sys.modules:
spotipy = types.ModuleType("spotipy")
class _DummySpotify:
def __init__(self, *args, **kwargs):
pass
oauth2 = types.ModuleType("spotipy.oauth2")
class _DummyOAuth:
def __init__(self, *args, **kwargs):
pass
spotipy.Spotify = _DummySpotify
oauth2.SpotifyOAuth = _DummyOAuth
oauth2.SpotifyClientCredentials = _DummyOAuth
spotipy.oauth2 = oauth2
sys.modules["spotipy"] = spotipy
sys.modules["spotipy.oauth2"] = oauth2
if "config.settings" not in sys.modules:
config_pkg = types.ModuleType("config")
settings_mod = types.ModuleType("config.settings")
class _DummyConfigManager:
def get(self, key, default=None):
return default
def get_active_media_server(self):
return "primary"
settings_mod.config_manager = _DummyConfigManager()
config_pkg.settings = settings_mod
sys.modules["config"] = config_pkg
sys.modules["config.settings"] = settings_mod
from core import metadata_service
@pytest.fixture(autouse=True)
def _clear_metadata_client_cache():
metadata_service.clear_cached_metadata_clients()
yield
metadata_service.clear_cached_metadata_clients()
def _album(album_id="album-1", name="Album One", album_type="album"):
return {
"id": album_id,
"name": name,
"images": [{"url": f"https://img.example/{album_id}.jpg"}],
"release_date": "2024-01-01",
"album_type": album_type,
"total_tracks": 1,
}
def _track(track_id="track-1", name="Track One"):
return {
"id": track_id,
"name": name,
"artists": [{"name": "Artist One"}],
"duration_ms": 123456,
"track_number": 1,
"disc_number": 1,
"explicit": "explicit",
"preview_url": "https://preview.example/track-1",
"external_urls": {"spotify": "https://example/track-1"},
"uri": f"spotify:track:{track_id}",
}
def test_get_artist_album_tracks_uses_primary_source_priority(monkeypatch):
calls = []
monkeypatch.setattr(metadata_service, "get_primary_source", lambda: "deezer")
monkeypatch.setattr(metadata_service, "get_source_priority", lambda primary: [primary, "spotify", "itunes"])
monkeypatch.setattr(metadata_service, "get_client_for_source", lambda source: object())
def fake_get_album_for_source(source, album_id):
calls.append(("album", source, album_id))
return _album("album-1", "Album One") if source == "deezer" and album_id == "album-1" else None
def fake_get_album_tracks_for_source(source, album_id):
calls.append(("tracks", source, album_id))
return {"items": [_track()]} if source == "deezer" and album_id == "album-1" else None
monkeypatch.setattr(metadata_service, "get_album_for_source", fake_get_album_for_source)
monkeypatch.setattr(metadata_service, "get_album_tracks_for_source", fake_get_album_tracks_for_source)
result = metadata_service.get_artist_album_tracks(
"album-1",
artist_name="Artist One",
album_name="Album One",
)
assert result["success"] is True
assert result["source"] == "deezer"
assert result["source_priority"] == ["deezer", "spotify", "itunes"]
assert result["resolved_album_id"] == "album-1"
assert result["album"]["image_url"] == "https://img.example/album-1.jpg"
assert result["tracks"][0]["artists"] == ["Artist One"]
assert result["tracks"][0]["explicit"] is True
assert calls == [("album", "deezer", "album-1"), ("tracks", "deezer", "album-1")]
def test_get_artist_album_tracks_resolves_database_album_reference(monkeypatch):
calls = []
monkeypatch.setattr(metadata_service, "get_primary_source", lambda: "deezer")
monkeypatch.setattr(metadata_service, "get_source_priority", lambda primary: [primary, "spotify", "itunes"])
monkeypatch.setattr(metadata_service, "get_client_for_source", lambda source: object())
def fake_get_album_for_source(source, album_id):
calls.append(("album", source, album_id))
if source == "itunes" and album_id == "itunes-123":
return _album("itunes-123", "Resolved Album")
return None
def fake_get_album_tracks_for_source(source, album_id):
calls.append(("tracks", source, album_id))
if source == "itunes" and album_id == "itunes-123":
return {"items": [_track("itunes-track-1", "Resolved Track")]}
return None
def fake_resolve_album_reference(album_id, preferred_source=None, album_name="", artist_name=""):
assert album_id == "db-1"
assert preferred_source == "itunes"
return "itunes-123", "itunes"
monkeypatch.setattr(metadata_service, "get_album_for_source", fake_get_album_for_source)
monkeypatch.setattr(metadata_service, "get_album_tracks_for_source", fake_get_album_tracks_for_source)
monkeypatch.setattr(metadata_service, "resolve_album_reference", fake_resolve_album_reference)
result = metadata_service.get_artist_album_tracks(
"db-1",
artist_name="Artist One",
album_name="Album One",
source_override="itunes",
)
assert result["success"] is True
assert result["source"] == "itunes"
assert result["resolved_album_id"] == "itunes-123"
assert result["tracks"][0]["name"] == "Resolved Track"
assert ("album", "itunes", "itunes-123") in calls
assert ("tracks", "itunes", "itunes-123") in calls
def test_resolve_album_reference_prefers_stored_external_id(monkeypatch):
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("CREATE TABLE artists (id INTEGER PRIMARY KEY, name TEXT)")
cursor.execute(
"""
CREATE TABLE albums (
id INTEGER PRIMARY KEY,
title TEXT,
artist_id INTEGER,
spotify_album_id TEXT,
itunes_album_id TEXT,
deezer_id TEXT,
deezer_album_id TEXT,
discogs_id TEXT,
soul_id TEXT,
hydrabase_album_id TEXT
)
"""
)
cursor.execute("INSERT INTO artists (id, name) VALUES (1, 'Artist One')")
cursor.execute(
"""
INSERT INTO albums (id, title, artist_id, deezer_id)
VALUES (1, 'Album One', 1, 'deezer-abc')
"""
)
conn.commit()
class _FakeDatabase:
def _get_connection(self):
return conn
monkeypatch.setattr("database.music_database.get_database", lambda: _FakeDatabase())
monkeypatch.setattr(metadata_service, "get_primary_source", lambda: "deezer")
monkeypatch.setattr(metadata_service, "get_source_priority", lambda primary: [primary, "spotify"])
resolved_id, resolved_source = metadata_service.resolve_album_reference("1", preferred_source="deezer")
assert resolved_id == "deezer-abc"
assert resolved_source == "deezer"
def test_resolve_album_reference_searches_by_name_when_no_external_id_exists(monkeypatch):
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("CREATE TABLE artists (id INTEGER PRIMARY KEY, name TEXT)")
cursor.execute(
"""
CREATE TABLE albums (
id INTEGER PRIMARY KEY,
title TEXT,
artist_id INTEGER,
spotify_album_id TEXT,
itunes_album_id TEXT,
deezer_id TEXT,
deezer_album_id TEXT,
discogs_id TEXT,
soul_id TEXT,
hydrabase_album_id TEXT
)
"""
)
cursor.execute("INSERT INTO artists (id, name) VALUES (1, 'Artist One')")
cursor.execute("INSERT INTO albums (id, title, artist_id) VALUES (1, 'Album One', 1)")
conn.commit()
class _FakeDatabase:
def _get_connection(self):
return conn
class _FakeSearchClient:
def __init__(self):
self.calls = []
def search_albums(self, query, **kwargs):
self.calls.append((query, dict(kwargs)))
return [types.SimpleNamespace(id="searched-123", name="Album One")]
fake_client = _FakeSearchClient()
monkeypatch.setattr("database.music_database.get_database", lambda: _FakeDatabase())
monkeypatch.setattr(metadata_service, "get_primary_source", lambda: "deezer")
monkeypatch.setattr(metadata_service, "get_source_priority", lambda primary: [primary, "spotify"])
monkeypatch.setattr(metadata_service, "get_client_for_source", lambda source: fake_client if source == "deezer" else None)
resolved_id, resolved_source = metadata_service.resolve_album_reference("1", preferred_source="deezer")
assert resolved_id == "searched-123"
assert resolved_source == "deezer"
assert fake_client.calls == [("Artist One Album One", {"limit": 5})]

@ -11492,223 +11492,50 @@ def get_artist_discography(artist_id):
logger.exception("Error fetching artist discography for %s", artist_id)
return jsonify({"error": str(e)}), 500
def _resolve_db_album_id(album_id, artist_id=None):
"""Resolve a database album ID to a real external album ID.
When the artist detail page falls back to owned_releases, the album cards
carry a database ID. This helper looks up stored external IDs first, then
falls back to a search by album title + artist name using the primary
metadata source (with fallback to other sources).
"""
try:
database = get_database()
with database._get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT a.title, a.spotify_album_id, a.itunes_album_id, a.discogs_id,
a.deezer_album_id, ar.name as artist_name
FROM albums a
JOIN artists ar ON a.artist_id = ar.id
WHERE a.id = ?
""", (album_id,))
row = cursor.fetchone()
if not row:
return None
# Prefer stored external IDs — match the active primary source first
fallback = _get_metadata_fallback_source()
id_priority = {
'spotify': [('spotify_album_id', None), ('deezer_album_id', None), ('itunes_album_id', None), ('discogs_id', None)],
'deezer': [('deezer_album_id', None), ('spotify_album_id', None), ('itunes_album_id', None), ('discogs_id', None)],
'itunes': [('itunes_album_id', None), ('spotify_album_id', None), ('deezer_album_id', None), ('discogs_id', None)],
'discogs': [('discogs_id', None), ('spotify_album_id', None), ('deezer_album_id', None), ('itunes_album_id', None)],
}
for col, _ in id_priority.get(fallback, id_priority['spotify']):
val = row[col] if col in row.keys() else None
if val:
return val
# No stored external ID — search by name using primary source with fallback
album_title = row['title']
artist_name = row['artist_name']
query = f"{artist_name} {album_title}"
logger.debug("Searching for album by name: %s", query)
from core.metadata_service import get_source_priority, get_client_for_source
for source in get_source_priority(fallback):
try:
client = get_client_for_source(source)
if not client:
continue
results = client.search_albums(query, limit=5)
if results:
for album in results:
if album.name.lower().strip() == album_title.lower().strip():
logger.debug("Found exact album match via %s: %s (id=%s)", source, album.name, album.id)
return album.id
logger.debug("No exact match via %s, using best result: %s (id=%s)", source, results[0].name, results[0].id)
return results[0].id
except Exception as e:
logger.debug("Album search via %s failed: %s", source, e)
continue
except Exception as e:
logger.debug("Error resolving DB album ID %s: %s", album_id, e)
return None
@app.route('/api/artist/<artist_id>/album/<album_id>/tracks', methods=['GET'])
def get_artist_album_tracks(artist_id, album_id):
@app.route('/api/album/<album_id>/tracks', methods=['GET'])
def get_album_tracks(album_id):
"""Get tracks for specific album formatted for download missing tracks modal"""
try:
# Try Hydrabase first when active and album name provided
if _is_hydrabase_active():
album_name = request.args.get('name', '')
album_artist = request.args.get('artist', '')
try:
hydra_tracks = hydrabase_client.get_album_tracks(album_id, limit=50)
if hydra_tracks:
album_info = {
'id': album_id,
'name': album_name or hydra_tracks[0].album or '',
'image_url': None,
'images': [],
'release_date': '',
'album_type': 'album',
'total_tracks': len(hydra_tracks)
}
formatted_tracks = []
for t in hydra_tracks:
artist_list = t.artists if isinstance(t.artists, list) else [t.artists] if t.artists else []
formatted_tracks.append({
'id': t.id,
'name': t.name,
'artists': [a if isinstance(a, str) else a for a in artist_list],
'duration_ms': t.duration_ms,
'track_number': t.track_number or 0,
'disc_number': t.disc_number or 1,
'explicit': False,
'preview_url': t.preview_url,
'external_urls': t.external_urls or {},
'uri': '',
'album': album_info
})
logger.info("Hydrabase returned %s tracks for album %s", len(formatted_tracks), album_info['name'])
return jsonify({
'success': True,
'album': album_info,
'tracks': formatted_tracks
})
except Exception as e:
logger.warning(f"Hydrabase album_tracks failed for '{album_id}', falling back to Spotify: {e}")
# Source override: when user navigated from a specific search tab
source_override = request.args.get('source', '')
client = None
if source_override == 'itunes':
client = _get_itunes_client()
elif source_override == 'hydrabase':
plugin = request.args.get('plugin', '').lower()
if plugin == 'deezer':
client = _get_deezer_client()
elif plugin == 'itunes' or album_id.isdigit():
client = _get_itunes_client()
elif source_override == 'deezer':
client = _get_deezer_client()
elif source_override == 'discogs':
client = _get_discogs_client()
album_name = request.args.get('name', '').strip()
artist_name = request.args.get('artist', '').strip()
source_override = request.args.get('source', '').strip().lower()
if source_override == 'hydrabase':
plugin = request.args.get('plugin', '').strip().lower()
if plugin in ('itunes', 'deezer'):
source_override = plugin
elif album_id.isdigit():
source_override = 'itunes'
else:
source_override = 'spotify'
# No source override — use the primary metadata source
if not client:
try:
client = _get_metadata_fallback_client()
except Exception:
pass
# Fall back to Spotify if available
if not client and spotify_client and spotify_client.is_authenticated():
client = spotify_client
if not client:
return jsonify({"error": "No metadata source available. Configure Spotify, Deezer, or iTunes in Settings."}), 401
from core.metadata_service import get_artist_album_tracks as _get_artist_album_tracks
logger.debug(
"Fetching tracks for album %s by artist %s (source=%s)",
result = _get_artist_album_tracks(
album_id,
artist_id,
source_override or 'auto',
artist_name=artist_name,
album_name=album_name,
source_override=source_override or None,
)
# Get album information first
album_data = client.get_album(album_id)
resolved_album_id = album_id
# If direct lookup failed, the album_id might be a database ID — resolve it
if not album_data:
resolved_album_id = _resolve_db_album_id(album_id, artist_id)
if resolved_album_id and resolved_album_id != album_id:
logger.debug("Resolved DB album ID %s -> external ID %s", album_id, resolved_album_id)
album_data = client.get_album(resolved_album_id)
if not album_data:
return jsonify({"error": "Album not found"}), 404
if not result.get('success'):
return jsonify({"error": result.get('error', 'Album not found')}), result.get('status_code', 404)
# Get album tracks
tracks_data = client.get_album_tracks(resolved_album_id)
if not tracks_data or 'items' not in tracks_data or len(tracks_data['items']) == 0:
return jsonify({"error": "No tracks found for album — it may be region-restricted or unavailable on this metadata source"}), 404
# Handle both dict and object responses from spotify_client.get_album()
if isinstance(album_data, dict):
album_info = {
'id': album_data.get('id'),
'name': album_data.get('name'),
'image_url': album_data.get('images', [{}])[0].get('url') if album_data.get('images') else None,
'images': album_data.get('images', []), # Include images array for wishlist cover art
'release_date': album_data.get('release_date'),
'album_type': album_data.get('album_type'),
'total_tracks': album_data.get('total_tracks')
}
else:
# Handle Album object case
album_info = {
'id': album_data.id,
'name': album_data.name,
'image_url': album_data.image_url,
'images': album_data.images if hasattr(album_data, 'images') else [], # Include images array for wishlist cover art
'release_date': album_data.release_date,
'album_type': album_data.album_type,
'total_tracks': album_data.total_tracks
}
# Format tracks for download missing tracks modal compatibility
formatted_tracks = []
for track_item in tracks_data['items']:
# Create track object compatible with download missing tracks modal
formatted_track = {
'id': track_item['id'],
'name': track_item['name'],
'artists': [artist['name'] for artist in track_item['artists']],
'duration_ms': track_item['duration_ms'],
'track_number': track_item['track_number'],
'disc_number': track_item.get('disc_number', 1),
'explicit': track_item.get('explicit', False),
'preview_url': track_item.get('preview_url'),
'external_urls': track_item.get('external_urls', {}),
'uri': track_item['uri'],
# Add album context for virtual playlist
'album': album_info
}
formatted_tracks.append(formatted_track)
logger.info("Successfully formatted %s tracks for album %s", len(formatted_tracks), album_info['name'])
logger.info(
"Successfully formatted %s tracks for album %s",
len(result.get('tracks', [])),
result.get('album', {}).get('name', album_name or album_id),
)
return jsonify({
'success': True,
'album': album_info,
'tracks': formatted_tracks
'album': result['album'],
'tracks': result['tracks'],
'source': result.get('source'),
'source_priority': result.get('source_priority', []),
'resolved_album_id': result.get('resolved_album_id'),
})
except Exception as e:
logger.exception("Error fetching album tracks for artist %s album %s", artist_id, album_id)
logger.exception("Error fetching album tracks for album %s", album_id)
return jsonify({"error": str(e)}), 500
@app.route('/api/artist/<artist_id>/download-discography', methods=['POST'])
@ -33216,7 +33043,7 @@ def get_playlist_tracks(playlist_id):
return jsonify({"error": str(e)}), 500
@app.route('/api/spotify/album/<album_id>', methods=['GET'])
def get_album_tracks(album_id):
def get_spotify_album_tracks(album_id):
"""Fetches full track details for a specific album."""
use_hydrabase = _is_hydrabase_active()

@ -10407,7 +10407,7 @@ async function rehydrateArtistAlbumModal(virtualPlaylistId, playlistName, batchI
// Fetch the album tracks to get proper artist and album data
try {
const response = await fetch(`/api/artist/${artistId}/album/${albumId}/tracks`);
const response = await fetch(`/api/album/${albumId}/tracks`);
const data = await response.json();
if (!data.success || !data.album || !data.tracks) {
@ -21602,7 +21602,7 @@ async function confirmMatch() {
const artistId = currentMatchingData.selectedArtist.id;
const albumId = currentMatchingData.selectedAlbum.id;
const _aat3 = new URLSearchParams({ name: currentMatchingData.selectedAlbum.name || '', artist: currentMatchingData.selectedArtist.name || '' });
const tracksResponse = await fetch(`/api/artist/${artistId}/album/${albumId}/tracks?${_aat3}`);
const tracksResponse = await fetch(`/api/album/${albumId}/tracks?${_aat3}`);
if (!tracksResponse.ok) {
throw new Error(`Failed to fetch Spotify tracks: ${tracksResponse.status}`);
@ -37514,7 +37514,7 @@ async function createArtistAlbumVirtualPlaylist(album, albumType) {
if (artistsPageState.pluginOverride) {
_aat1.set('plugin', artistsPageState.pluginOverride);
}
const response = await fetch(`/api/artist/${artist.id}/album/${album.id}/tracks?${_aat1}`);
const response = await fetch(`/api/album/${album.id}/tracks?${_aat1}`);
if (!response.ok) {
if (response.status === 401) {
@ -45299,7 +45299,7 @@ function createReleaseCard(release) {
// Load tracks for the album (pass name/artist for Hydrabase support)
const _aat2 = new URLSearchParams({ name: albumData.name || '', artist: currentArtist.name || '' });
const response = await fetch(`/api/artist/${currentArtist.id}/album/${albumData.id}/tracks?${_aat2}`);
const response = await fetch(`/api/album/${albumData.id}/tracks?${_aat2}`);
if (!response.ok) {
throw new Error(`Failed to load album tracks: ${response.status}`);
}

Loading…
Cancel
Save