Show MusicBrainz release variants in import

Expand matched MusicBrainz release groups into concrete releases for specific album searches so import users can choose the correct edition by track count, format, country, and disambiguation. Preserve distinct MusicBrainz release IDs instead of deduping same-title variants, carry release metadata through import matching, and surface those details on album result cards. Add coverage for variant preservation and release-group expansion.
pull/692/head
Broque Thomas 1 day ago
parent 7bee424686
commit 4ca3f70bf3

@ -354,6 +354,10 @@ def build_album_import_context(
"images": album.get("images") or ([] if not track_album_image else [{"url": track_album_image}]),
"source": source,
}
for key in ("format", "country", "status", "label", "disambiguation", "release_group_id"):
value = str(album.get(key) or "").strip()
if value:
normalized_album[key] = value
original_search = {
"title": normalized_track["name"],

@ -198,6 +198,12 @@ def _normalize_album_result(album: Any, source: str) -> Dict[str, Any]:
).strip()
release_date = str(_extract_value(album, "release_date", "releaseDate", default="") or "").strip()
album_type = str(_extract_value(album, "album_type", "type", default="album") or "album").strip() or "album"
release_format = str(_extract_value(album, "format", "release_format", default="") or "").strip()
country = str(_extract_value(album, "country", default="") or "").strip()
status = str(_extract_value(album, "status", default="") or "").strip()
label = str(_extract_value(album, "label", default="") or "").strip()
disambiguation = str(_extract_value(album, "disambiguation", default="") or "").strip()
release_group_id = str(_extract_value(album, "release_group_id", "releaseGroupId", default="") or "").strip()
total_tracks = _extract_value(album, "total_tracks", "track_count", default=0)
if isinstance(total_tracks, (list, tuple, set)):
@ -225,7 +231,7 @@ def _normalize_album_result(album: Any, source: str) -> Dict[str, Any]:
else:
image_url = _extract_value(first_image, "url", "image_url", "src", default="")
return {
suggestion = {
"id": album_id or album_name or "unknown-album",
"name": album_name or album_id or "Unknown Album",
"artist": artist_name or "Unknown Artist",
@ -235,14 +241,30 @@ def _normalize_album_result(album: Any, source: str) -> Dict[str, Any]:
"album_type": album_type,
"source": source,
}
def _album_fingerprint(album: Dict[str, Any]) -> Tuple[str, str, str, str]:
if release_format:
suggestion["format"] = release_format
if country:
suggestion["country"] = country
if status:
suggestion["status"] = status
if label:
suggestion["label"] = label
if disambiguation:
suggestion["disambiguation"] = disambiguation
if release_group_id:
suggestion["release_group_id"] = release_group_id
return suggestion
def _album_fingerprint(album: Dict[str, Any]) -> Tuple[str, ...]:
if album.get("source") == "musicbrainz" and album.get("id"):
return ("musicbrainz", str(album.get("id", "") or "").strip().casefold())
return (
str(album.get("name", "") or "").strip().casefold(),
str(album.get("artist", "") or "").strip().casefold(),
str(album.get("release_date", "") or "").strip()[:10].casefold(),
str(album.get("album_type", "") or "").strip().casefold(),
str(album.get("total_tracks", "") or "").strip(),
)

@ -264,6 +264,11 @@ def _build_album_info_typed(album_data: Dict[str, Any], album_id: str,
if isinstance(first, dict):
ctx['image_url'] = first.get('url') or ctx.get('image_url')
for key in ('format', 'country', 'status', 'label', 'disambiguation', 'release_group_id'):
value = album_data.get(key)
if value:
ctx[key] = value
return ctx
@ -327,7 +332,7 @@ def _build_album_info_legacy(album_data: Any, album_id: str,
if not image_url:
image_url = _extract_lookup_value(album_data, 'image_url', 'thumb_url')
return {
album_info = {
'id': _extract_lookup_value(album_data, 'id', 'album_id', 'collectionId', 'release_id', default=album_id) or album_id,
'name': _extract_lookup_value(album_data, 'name', 'title', default=album_name or album_id) or album_name or album_id,
'artist': resolved_artist_name or '',
@ -345,6 +350,11 @@ def _build_album_info_legacy(album_data: Any, album_id: str,
),
'total_tracks': _extract_lookup_value(album_data, 'total_tracks', 'track_count', default=0) or 0,
}
for key in ('format', 'country', 'status', 'label', 'disambiguation', 'release_group_id'):
value = _extract_lookup_value(album_data, key, default='')
if value:
album_info[key] = value
return album_info
def _build_album_track_entry(track_item: Any, album_info: Dict[str, Any], source: str) -> Dict[str, Any]:

@ -298,6 +298,41 @@ class MusicBrainzClient:
logger.error(f"Error browsing release-groups for artist {artist_mbid}: {e}")
return []
@rate_limited
def browse_release_group_releases(self, release_group_mbid: str,
limit: int = 100,
offset: int = 0) -> List[Dict[str, Any]]:
"""Browse concrete releases that belong to a release-group.
Release-groups identify the logical album; releases identify the
actual edition the user may own (country, format, explicit/clean
disambiguation, bonus tracks, track count). Manual import needs the
latter so users can choose the matching tracklist.
"""
try:
params = {
'release-group': release_group_mbid,
'fmt': 'json',
'limit': min(limit, 100),
'offset': offset,
'inc': 'artist-credits+media+labels+release-groups',
}
response = self.session.get(
f"{self.BASE_URL}/release",
params=params,
timeout=10
)
response.raise_for_status()
data = response.json()
releases = data.get('releases', [])
logger.debug(f"Browsed {len(releases)} releases for release-group {release_group_mbid}")
return releases
except Exception as e:
logger.error(f"Error browsing releases for release-group {release_group_mbid}: {e}")
return []
@rate_limited
def search_recordings_by_artist_mbid(self, artist_mbid: str,
limit: int = 100) -> List[Dict[str, Any]]:

@ -57,6 +57,12 @@ class Album:
album_type: str
image_url: Optional[str] = None
external_urls: Optional[Dict[str, str]] = None
format: Optional[str] = None
country: Optional[str] = None
status: Optional[str] = None
label: Optional[str] = None
disambiguation: Optional[str] = None
release_group_id: Optional[str] = None
def _cover_art_url(mbid: str, scope: str = 'release') -> Optional[str]:
@ -316,8 +322,102 @@ class MusicBrainzSearchClient:
album_type=album_type,
image_url=image_url,
external_urls={'musicbrainz': f'https://musicbrainz.org/release-group/{rg_mbid}'} if rg_mbid else {},
disambiguation=rg.get('disambiguation') or None,
release_group_id=rg_mbid or None,
)
def _release_total_tracks(self, release: Dict[str, Any]) -> int:
total_tracks = 0
for medium in release.get('media', []) or []:
try:
total_tracks += int(medium.get('track-count') or 0)
except (TypeError, ValueError):
pass
return total_tracks
def _release_formats(self, release: Dict[str, Any]) -> str:
formats = []
for medium in release.get('media', []) or []:
fmt = (medium.get('format') or '').strip()
if fmt and fmt not in formats:
formats.append(fmt)
return ', '.join(formats)
def _release_label(self, release: Dict[str, Any]) -> str:
for info in release.get('label-info', []) or []:
label = (info.get('label') or {}) if isinstance(info, dict) else {}
name = (label.get('name') or '').strip()
if name:
return name
return ''
def _release_to_album(self, release: Dict[str, Any],
fallback_artist_name: Optional[str] = None) -> Optional[Album]:
"""Project a concrete MusicBrainz release into our Album dataclass."""
mbid = release.get('id', '')
title = release.get('title', '') or ''
if not title:
return None
artists = _extract_artist_credit(release.get('artist-credit', []))
if not artists and fallback_artist_name:
artists = [fallback_artist_name]
rg = release.get('release-group', {}) or {}
primary_type = rg.get('primary-type', '') or ''
secondary_types = rg.get('secondary-types', []) or []
album_type = _map_release_type(primary_type, secondary_types)
rg_mbid = rg.get('id', '') or release.get('release-group-id', '')
image_url = self._cached_art(mbid, rg_mbid)
return Album(
id=mbid,
name=title,
artists=artists if artists else ['Unknown Artist'],
release_date=release.get('date', '') or '',
total_tracks=self._release_total_tracks(release),
album_type=album_type,
image_url=image_url,
external_urls={'musicbrainz': f'https://musicbrainz.org/release/{mbid}'} if mbid else {},
format=self._release_formats(release) or None,
country=(release.get('country') or '').strip() or None,
status=(release.get('status') or '').strip() or None,
label=self._release_label(release) or None,
disambiguation=(release.get('disambiguation') or '').strip() or None,
release_group_id=rg_mbid or None,
)
def _release_variant_key(self, album: Album):
status_rank = 0 if (album.status or '').lower() == 'official' else 1
date = (album.release_date or '9999-99-99')[:10] or '9999-99-99'
track_rank = album.total_tracks or 9999
country_rank = 0 if (album.country or '') in ('XW', 'US', 'GB') else 1
return (
status_rank,
date,
country_rank,
track_rank,
album.format or '',
album.disambiguation or '',
album.id,
)
def _release_group_releases_to_albums(self, rg: Dict[str, Any], artist_name: str,
limit: int) -> List[Album]:
rg_mbid = rg.get('id', '')
if not rg_mbid:
return []
releases = self._client.browse_release_group_releases(rg_mbid, limit=max(limit, 25))
albums = []
for release in releases:
release.setdefault('release-group', rg)
album = self._release_to_album(release, fallback_artist_name=artist_name)
if album:
albums.append(album)
albums.sort(key=self._release_variant_key)
return albums[:limit]
def search_albums(self, query: str, limit: int = 10) -> List[Album]:
"""Search MusicBrainz for releases (albums).
@ -400,6 +500,13 @@ class MusicBrainzSearchClient:
matched = [rg for rg in rgs if hint_lower in (rg.get('title') or '').lower()]
if matched:
rgs = matched
expanded = []
for rg in rgs:
expanded.extend(self._release_group_releases_to_albums(rg, tname, limit))
if len(expanded) >= limit:
break
if expanded:
return expanded[:limit]
else:
fallback = self._search_albums_text(title_hint, tname, limit)
if fallback:
@ -436,63 +543,24 @@ class MusicBrainzSearchClient:
albums = []
for r in results:
mbid = r.get('id', '')
title = r.get('title', '')
if not title:
continue
artists = _extract_artist_credit(r.get('artist-credit', []))
release_date = r.get('date', '') or ''
# Track count from media
total_tracks = 0
media = r.get('media', [])
for m in media:
total_tracks += m.get('track-count', 0)
# Release type
rg = r.get('release-group', {})
primary_type = rg.get('primary-type', '') or ''
secondary_types = rg.get('secondary-types', []) or []
album_type = _map_release_type(primary_type, secondary_types)
# Cover art (non-blocking — skip if slow)
rg_mbid = rg.get('id', '')
image_url = self._cached_art(mbid, rg_mbid)
external_urls = {'musicbrainz': f'https://musicbrainz.org/release/{mbid}'} if mbid else {}
albums.append(Album(
id=mbid,
name=title,
artists=artists if artists else ['Unknown Artist'],
release_date=release_date,
total_tracks=total_tracks,
album_type=album_type,
image_url=image_url,
external_urls=external_urls,
))
# Deduplicate: keep best version of each title+artist combo
# (prefer ones with release dates and cover art)
seen = {}
deduped = []
album = self._release_to_album(r)
if album:
albums.append(album)
# Keep distinct MusicBrainz releases. The same title/artist/date
# can represent explicit, clean, regional, format, or bonus-track
# variants with different tracklists, which manual import must let
# the user choose.
seen_ids = set()
unique = []
for album in albums:
key = (album.name.lower().strip(), ', '.join(album.artists).lower().strip())
if key not in seen:
seen[key] = album
deduped.append(album)
else:
existing = seen[key]
# Prefer: has date > no date, has art > no art
better = False
if not existing.release_date and album.release_date:
better = True
elif not existing.image_url and album.image_url:
better = True
if better:
deduped[deduped.index(existing)] = album
seen[key] = album
return deduped
if album.id and album.id in seen_ids:
continue
if album.id:
seen_ids.add(album.id)
unique.append(album)
unique.sort(key=self._release_variant_key)
return unique[:limit]
except Exception as e:
logger.warning(f"MusicBrainz album search failed: {e}")
return []
@ -1030,6 +1098,12 @@ class MusicBrainzSearchClient:
'images': images,
'tracks': tracks,
'external_urls': {'musicbrainz': f'https://musicbrainz.org/release/{release_mbid}'},
'format': self._release_formats(release),
'country': release.get('country') or '',
'status': release.get('status') or '',
'label': self._release_label(release),
'disambiguation': release.get('disambiguation') or '',
'release_group_id': rg_mbid,
}
def get_artist_albums(self, artist_mbid: str, album_type: str = 'album,single', limit: int = 200) -> List:

@ -105,6 +105,55 @@ def test_search_import_albums_falls_back_when_primary_has_no_results(monkeypatch
assert spotify_client.calls == [("Album Two", {"limit": 2, "allow_fallback": False})]
def test_search_import_albums_preserves_musicbrainz_release_variants(monkeypatch):
musicbrainz_client = FakeClient([
SimpleNamespace(
id="rel-clean",
name="Shock Value",
artists=["Timbaland"],
release_date="2007-04-03",
total_tracks=17,
image_url="",
album_type="album",
format="CD",
country="US",
status="Official",
disambiguation="clean",
release_group_id="rg-shock",
),
SimpleNamespace(
id="rel-explicit",
name="Shock Value",
artists=["Timbaland"],
release_date="2007-04-03",
total_tracks=18,
image_url="",
album_type="album",
format="CD",
country="US",
status="Official",
disambiguation="explicit",
release_group_id="rg-shock",
),
])
monkeypatch.setattr(import_staging, "get_primary_source", lambda: "musicbrainz")
monkeypatch.setattr(import_staging, "get_source_priority", lambda primary: [primary])
monkeypatch.setattr(import_staging, "get_client_for_source", lambda source: musicbrainz_client)
monkeypatch.setattr(
import_staging,
"_search_albums_for_source",
lambda source, client, query, limit=5: client.search_albums(query, limit=limit),
)
results = import_staging.search_import_albums("Timbaland Shock Value", limit=12)
assert [result["id"] for result in results] == ["rel-clean", "rel-explicit"]
assert [result["total_tracks"] for result in results] == [17, 18]
assert results[1]["disambiguation"] == "explicit"
assert results[1]["release_group_id"] == "rg-shock"
def test_search_import_tracks_prefers_primary_source(monkeypatch):
deezer_client = FakeClient([
SimpleNamespace(

@ -429,6 +429,58 @@ def test_search_albums_text_path_filters_by_score():
assert 'Bad' not in titles
def test_search_albums_text_path_keeps_release_variants():
client = MusicBrainzSearchClient()
client._client = MagicMock()
client._client.search_release.return_value = [
{'id': 'rel-clean', 'title': 'Shock Value', 'score': 100,
'date': '2007-04-03', 'country': 'US', 'status': 'Official',
'disambiguation': 'clean',
'media': [{'format': 'CD', 'track-count': 17}],
'release-group': {'id': 'rg-shock', 'primary-type': 'Album'},
'artist-credit': [{'name': 'Timbaland'}]},
{'id': 'rel-explicit', 'title': 'Shock Value', 'score': 100,
'date': '2007-04-03', 'country': 'US', 'status': 'Official',
'disambiguation': 'explicit',
'media': [{'format': 'CD', 'track-count': 18}],
'release-group': {'id': 'rg-shock', 'primary-type': 'Album'},
'artist-credit': [{'name': 'Timbaland'}]},
]
albums = client.search_albums('Timbaland - Shock Value', limit=10)
assert [a.id for a in albums] == ['rel-clean', 'rel-explicit']
assert [a.total_tracks for a in albums] == [17, 18]
assert albums[1].disambiguation == 'explicit'
def test_search_albums_title_hint_expands_release_group_to_releases():
client = MusicBrainzSearchClient()
client._client = MagicMock()
client._client.search_artist.return_value = [_mk_artist('Spiderbait', 'artist-spiderbait', score=100)]
client._client.browse_artist_release_groups.return_value = [
{'id': 'rg-tonight', 'title': 'Tonight Alright', 'primary-type': 'Album',
'first-release-date': '2004-03-29', 'secondary-types': []},
]
client._client.browse_release_group_releases.return_value = [
{'id': 'rel-cd', 'title': 'Tonight Alright', 'date': '2004-03-29',
'country': 'AU', 'status': 'Official',
'media': [{'format': 'CD', 'track-count': 12}],
'artist-credit': [{'name': 'Spiderbait'}]},
{'id': 'rel-vinyl', 'title': 'Tonight Alright', 'date': '2024-07-26',
'country': 'AU', 'status': 'Official',
'media': [{'format': '12\" Vinyl', 'track-count': 13}],
'artist-credit': [{'name': 'Spiderbait'}]},
]
albums = client.search_albums('Spiderbait Tonight Alright', limit=10)
client._client.browse_release_group_releases.assert_called_once_with('rg-tonight', limit=25)
assert [a.id for a in albums] == ['rel-cd', 'rel-vinyl']
assert [a.total_tracks for a in albums] == [12, 13]
assert albums[0].format == 'CD'
# ---------------------------------------------------------------------------
# Track search — routing
# ---------------------------------------------------------------------------

@ -650,11 +650,23 @@ function _renderSuggestionCard(a, primarySource) {
const sourceBadge = (a.source && primarySource && a.source !== primarySource)
? `<div class="import-page-album-card-source">via ${_esc((SOURCE_LABELS[a.source] || {}).text || a.source)}</div>`
: '';
const metaParts = [
`${a.total_tracks || 0} tracks`,
a.release_date ? a.release_date.substring(0, 4) : '',
a.format || '',
a.country || '',
a.disambiguation || '',
].filter(Boolean);
const details = [a.status || '', a.label || ''].filter(Boolean);
const detailsLine = details.length
? `<div class="import-page-album-card-detail">${_esc(details.join(' · '))}</div>`
: '';
return `<div class="import-page-album-card" onclick="importPageSelectAlbum('${_escAttr(a.id)}')">
<img src="${a.image_url || '/static/placeholder-album.png'}" alt="${_escAttr(a.name)}" loading="lazy" onerror="this.src='/static/placeholder-album.png'">
<div class="import-page-album-card-title" title="${_escAttr(a.name)}">${_esc(a.name)}</div>
<div class="import-page-album-card-artist" title="${_escAttr(a.artist)}">${_esc(a.artist)}</div>
<div class="import-page-album-card-meta">${a.total_tracks} tracks · ${a.release_date ? a.release_date.substring(0, 4) : ''}</div>
<div class="import-page-album-card-meta">${_esc(metaParts.join(' · '))}</div>
${detailsLine}
${sourceBadge}
</div>`;
}
@ -744,12 +756,19 @@ async function importPageSelectAlbum(albumId) {
// Render hero
const album = data.album;
const heroMetaParts = [
`${album.total_tracks || 0} tracks`,
album.release_date ? album.release_date.substring(0, 4) : '',
album.format || '',
album.country || '',
album.disambiguation || '',
].filter(Boolean);
document.getElementById('import-page-album-hero').innerHTML = `
<img src="${album.image_url || '/static/placeholder-album.png'}" alt="${_escAttr(album.name)}" loading="lazy" onerror="this.src='/static/placeholder-album.png'">
<div class="import-page-album-hero-info">
<div class="import-page-album-hero-title">${_esc(album.name)}</div>
<div class="import-page-album-hero-artist">${_esc(album.artist)}</div>
<div class="import-page-album-hero-meta">${album.total_tracks} tracks · ${album.release_date ? album.release_date.substring(0, 4) : ''}</div>
<div class="import-page-album-hero-meta">${_esc(heroMetaParts.join(' · '))}</div>
</div>
`;

@ -40322,6 +40322,18 @@ div.artist-hero-badge {
font-size: 10px;
color: rgba(255, 255, 255, 0.3);
margin-top: 4px;
white-space: nowrap;
overflow: hidden;
text-overflow: ellipsis;
}
.import-page-album-card-detail {
font-size: 10px;
color: rgba(255, 255, 255, 0.36);
margin-top: 2px;
white-space: nowrap;
overflow: hidden;
text-overflow: ellipsis;
}
.import-page-album-card-source {

Loading…
Cancel
Save