From 258fc39364342e74db26367f5bf49ace20fc5fa7 Mon Sep 17 00:00:00 2001 From: Broque Thomas <26755000+Nezreka@users.noreply.github.com> Date: Thu, 16 Apr 2026 13:29:04 -0700 Subject: [PATCH] Picard-style release preference scoring for MusicBrainz matching Replaced track-count-only release selection with deterministic scoring across 6 factors: track count match (40pts), release status (10pts), country preference with US/worldwide bias (10pts), format preference favoring Digital/CD over Vinyl/Cassette (10pts), barcode presence (3pts), and date completeness (2pts). Same inputs always produce the same release. Also fixed critical bug: _embed_source_ids was missing the context parameter, silently skipping ALL source ID tag embedding since the MusicBrainz consistency commit. Now passes context from the caller. --- core/album_consistency.py | 243 ++++++++++++++++++++++++-------------- 1 file changed, 154 insertions(+), 89 deletions(-) diff --git a/core/album_consistency.py b/core/album_consistency.py index b89ac0f8..e913b8cc 100644 --- a/core/album_consistency.py +++ b/core/album_consistency.py @@ -57,6 +57,86 @@ _ID3_TXXX_MAP = { # MP4 freeform keys _MP4_KEY_PREFIX = '----:com.apple.iTunes:' +# ── Picard-style release preference scoring ── +# Preferred countries (higher = better). US/GB/XW(worldwide) are most common +# for English-language music. XE = Europe-wide. +_COUNTRY_SCORES = { + 'US': 10, 'XW': 10, 'GB': 8, 'XE': 7, 'CA': 6, 'AU': 5, 'DE': 4, + 'FR': 4, 'JP': 3, 'NL': 3, 'SE': 3, 'IT': 2, +} + +# Preferred formats (higher = better). Digital/CD are the standard; +# vinyl and cassette are niche reissues that often differ from the +# canonical tracklist. +_FORMAT_SCORES = { + 'Digital Media': 10, 'CD': 9, 'Enhanced CD': 8, + 'SACD': 7, 'Hybrid SACD': 7, 'Blu-spec CD': 7, + 'Vinyl': 3, '12" Vinyl': 3, '7" Vinyl': 2, + 'Cassette': 1, +} + +# Release status preference +_STATUS_SCORES = { + 'Official': 10, 'Promotion': 5, 'Bootleg': 1, 'Pseudo-Release': 1, +} + + +def _score_release(release: dict, expected_track_count: int) -> float: + """Score a MusicBrainz release for preference ranking. + + Higher score = better candidate. Factors: + - Track count match (most important — wrong count is wrong release) + - Release status (Official > Promo > Bootleg) + - Country preference (US/worldwide > regional) + - Format preference (Digital/CD > Vinyl > Cassette) + - Has barcode (sign of a real commercial release) + - Penalize releases with no media info (incomplete data) + """ + score = 0.0 + + # Track count match (0-40 points, biggest factor) + media = release.get('media', []) + mb_track_count = sum(len(m.get('tracks') or m.get('track-list', [])) + for m in media) + track_diff = abs(mb_track_count - expected_track_count) + if track_diff == 0: + score += 40 + elif track_diff <= 1: + score += 30 + elif track_diff <= 2: + score += 20 + elif track_diff <= 5: + score += 10 + # else: 0 points + + # Status (0-10 points) + status = release.get('status', '') + score += _STATUS_SCORES.get(status, 2) + + # Country (0-10 points) + country = release.get('country', '') + score += _COUNTRY_SCORES.get(country, 1) + + # Format from first medium (0-10 points) + if media: + fmt = media[0].get('format', '') + score += _FORMAT_SCORES.get(fmt, 4) + else: + score -= 5 # No media info = suspect + + # Barcode (0-3 points) — real commercial releases have barcodes + if release.get('barcode'): + score += 3 + + # Date completeness (0-2 points) — prefer releases with full dates + date = release.get('date', '') + if len(date) >= 10: + score += 2 # Full YYYY-MM-DD + elif len(date) >= 4: + score += 1 # Year only + + return score + def _normalize_title(s): """Normalize a title for comparison.""" @@ -71,106 +151,91 @@ def _normalize_title(s): def _find_best_release(album_name, artist_name, track_count, mb_service): """Search MusicBrainz for the best release matching this album. - Prefers releases where track count matches the download.""" + + Uses Picard-style preference scoring: track count match, release status, + country (US/worldwide preferred), format (Digital/CD preferred), barcode + presence, and date completeness. Deterministic — same inputs always + produce the same release. + """ try: - # First try our existing match_release (uses version qualifier scoring) - match = mb_service.match_release(album_name, artist_name) - if not match or not match.get('mbid'): - # Try stripping edition qualifiers — Spotify uses "Album (Super Deluxe)" - # but MusicBrainz just calls it "Album" - import re - stripped = re.sub( - r'\s*[\(\[]' - r'[^)\]]*' - r'(?:deluxe|expanded|remaster(?:ed)?|anniversary|special|collector|' - r'limited|bonus|platinum|gold|super\s*deluxe|standard|edition)' - r'[^)\]]*' - r'[\)\]]', - '', album_name, flags=re.IGNORECASE - ).strip() - # Also strip trailing bare editions - stripped = re.sub( - r'\s+(?:-\s+)?(?:deluxe|expanded|remaster(?:ed)?|anniversary|special|collector|' - r'limited|bonus|platinum|gold|super\s*deluxe|standard)' - r'(?:\s+(?:edition|version))?\s*$', - '', stripped, flags=re.IGNORECASE - ).strip() - - if stripped and stripped.lower() != album_name.lower(): - logger.info(f"Retrying MB search with stripped name: '{stripped}' (was '{album_name}')") - match = mb_service.match_release(stripped, artist_name) - - if not match or not match.get('mbid'): - # Final fallback: direct API search with stripped name - search_name = stripped or album_name - logger.info(f"No cached MB release — trying direct search for '{search_name}'") - search_results = mb_service.mb_client.search_release(search_name, artist_name, limit=5) - if not search_results: - logger.info(f"No MB release found for '{album_name}' by '{artist_name}'") - return None - mbid = search_results[0].get('id', '') - if not mbid: - return None - logger.info(f"Direct search found: {search_results[0].get('title', '')} ({mbid[:8]}...)") - else: - mbid = match['mbid'] - else: - mbid = match['mbid'] + import re + + # Build search name variants + search_names = [album_name] + stripped = re.sub( + r'\s*[\(\[]' + r'[^)\]]*' + r'(?:deluxe|expanded|remaster(?:ed)?|anniversary|special|collector|' + r'limited|bonus|platinum|gold|super\s*deluxe|standard|edition)' + r'[^)\]]*' + r'[\)\]]', + '', album_name, flags=re.IGNORECASE + ).strip() + stripped = re.sub( + r'\s+(?:-\s+)?(?:deluxe|expanded|remaster(?:ed)?|anniversary|special|collector|' + r'limited|bonus|platinum|gold|super\s*deluxe|standard)' + r'(?:\s+(?:edition|version))?\s*$', + '', stripped, flags=re.IGNORECASE + ).strip() + if stripped and stripped.lower() != album_name.lower(): + search_names.append(stripped) + + # Collect candidate release MBIDs from all search variants + candidate_mbids = [] + for name in search_names: + # Try cached match first + match = mb_service.match_release(name, artist_name) + if match and match.get('mbid'): + candidate_mbids.append(match['mbid']) + + # Also try direct search for more candidates + try: + search_results = mb_service.mb_client.search_release(name, artist_name, limit=5) + for sr in (search_results or []): + sr_id = sr.get('id', '') + if sr_id and sr_id not in candidate_mbids: + candidate_mbids.append(sr_id) + except Exception: + pass - # Fetch full release with tracklist - release = mb_service.mb_client.get_release( - mbid, includes=['recordings', 'release-groups', 'labels', 'media', 'artist-credits'] - ) - if not release: + if not candidate_mbids: + logger.info(f"No MB release found for '{album_name}' by '{artist_name}'") return None - # Check track count match - mb_track_count = sum(len(m.get('tracks') or m.get('track-list', [])) for m in release.get('media', [])) - if abs(mb_track_count - track_count) <= 2: - logger.info(f"Accepted release '{release.get('title')}' ({mbid[:8]}...) — " - f"{mb_track_count} tracks (downloaded {track_count})") - return release + # Fetch full release data for each candidate and score them + best_release = None + best_score = -1 - # Track count mismatch — try searching for a better release - # Use stripped name for search (MB often doesn't include edition suffixes) - import re - _search_name = re.sub( - r'\s*[\(\[][^)\]]*(?:deluxe|expanded|remaster|anniversary|special|collector|' - r'limited|bonus|platinum|gold|super\s*deluxe|standard|edition)[^)\]]*[\)\]]', - '', album_name, flags=re.IGNORECASE - ).strip() or album_name - logger.info(f"Release '{release.get('title')}' has {mb_track_count} tracks but we have {track_count} — " - f"searching for better match with '{_search_name}'") - search_results = mb_service.mb_client.search_release(_search_name, artist_name, limit=5) - if not search_results: - # Fall back to the first match even if count doesn't match perfectly - return release - - best_release = release - best_diff = abs(mb_track_count - track_count) - - for sr in search_results: - sr_mbid = sr.get('id', '') - if not sr_mbid or sr_mbid == mbid: - continue + for mbid in candidate_mbids[:8]: # Cap at 8 to limit API calls try: - candidate = mb_service.mb_client.get_release( - sr_mbid, includes=['recordings', 'release-groups', 'labels', 'media', 'artist-credits'] + release = mb_service.mb_client.get_release( + mbid, includes=['recordings', 'release-groups', 'labels', + 'media', 'artist-credits'] ) - if not candidate: + if not release: continue - cand_count = sum(len(m.get('tracks') or m.get('track-list', [])) for m in candidate.get('media', [])) - cand_diff = abs(cand_count - track_count) - if cand_diff < best_diff: - best_diff = cand_diff - best_release = candidate - if cand_diff == 0: - break # Perfect match + + score = _score_release(release, track_count) + + if score > best_score: + best_score = score + best_release = release + except Exception: continue - logger.info(f"Best release: '{best_release.get('title')}' ({best_release.get('id', '')[:8]}...) — " - f"track count diff: {best_diff}") + if best_release: + mb_count = sum(len(m.get('tracks') or m.get('track-list', [])) + for m in best_release.get('media', [])) + logger.info( + f"Selected release '{best_release.get('title')}' " + f"({best_release.get('id', '')[:8]}...) — " + f"score={best_score:.0f}, tracks={mb_count}, " + f"country={best_release.get('country', '?')}, " + f"format={best_release.get('media', [{}])[0].get('format', '?')}, " + f"status={best_release.get('status', '?')}" + ) + return best_release except Exception as e: