@ -57,6 +57,86 @@ _ID3_TXXX_MAP = {
# MP4 freeform keys
_MP4_KEY_PREFIX = ' ----:com.apple.iTunes: '
# ── Picard-style release preference scoring ──
# Preferred countries (higher = better). US/GB/XW(worldwide) are most common
# for English-language music. XE = Europe-wide.
_COUNTRY_SCORES = {
' US ' : 10 , ' XW ' : 10 , ' GB ' : 8 , ' XE ' : 7 , ' CA ' : 6 , ' AU ' : 5 , ' DE ' : 4 ,
' FR ' : 4 , ' JP ' : 3 , ' NL ' : 3 , ' SE ' : 3 , ' IT ' : 2 ,
}
# Preferred formats (higher = better). Digital/CD are the standard;
# vinyl and cassette are niche reissues that often differ from the
# canonical tracklist.
_FORMAT_SCORES = {
' Digital Media ' : 10 , ' CD ' : 9 , ' Enhanced CD ' : 8 ,
' SACD ' : 7 , ' Hybrid SACD ' : 7 , ' Blu-spec CD ' : 7 ,
' Vinyl ' : 3 , ' 12 " Vinyl ' : 3 , ' 7 " Vinyl ' : 2 ,
' Cassette ' : 1 ,
}
# Release status preference
_STATUS_SCORES = {
' Official ' : 10 , ' Promotion ' : 5 , ' Bootleg ' : 1 , ' Pseudo-Release ' : 1 ,
}
def _score_release ( release : dict , expected_track_count : int ) - > float :
""" Score a MusicBrainz release for preference ranking.
Higher score = better candidate . Factors :
- Track count match ( most important — wrong count is wrong release )
- Release status ( Official > Promo > Bootleg )
- Country preference ( US / worldwide > regional )
- Format preference ( Digital / CD > Vinyl > Cassette )
- Has barcode ( sign of a real commercial release )
- Penalize releases with no media info ( incomplete data )
"""
score = 0.0
# Track count match (0-40 points, biggest factor)
media = release . get ( ' media ' , [ ] )
mb_track_count = sum ( len ( m . get ( ' tracks ' ) or m . get ( ' track-list ' , [ ] ) )
for m in media )
track_diff = abs ( mb_track_count - expected_track_count )
if track_diff == 0 :
score + = 40
elif track_diff < = 1 :
score + = 30
elif track_diff < = 2 :
score + = 20
elif track_diff < = 5 :
score + = 10
# else: 0 points
# Status (0-10 points)
status = release . get ( ' status ' , ' ' )
score + = _STATUS_SCORES . get ( status , 2 )
# Country (0-10 points)
country = release . get ( ' country ' , ' ' )
score + = _COUNTRY_SCORES . get ( country , 1 )
# Format from first medium (0-10 points)
if media :
fmt = media [ 0 ] . get ( ' format ' , ' ' )
score + = _FORMAT_SCORES . get ( fmt , 4 )
else :
score - = 5 # No media info = suspect
# Barcode (0-3 points) — real commercial releases have barcodes
if release . get ( ' barcode ' ) :
score + = 3
# Date completeness (0-2 points) — prefer releases with full dates
date = release . get ( ' date ' , ' ' )
if len ( date ) > = 10 :
score + = 2 # Full YYYY-MM-DD
elif len ( date ) > = 4 :
score + = 1 # Year only
return score
def _normalize_title ( s ) :
""" Normalize a title for comparison. """
@ -71,106 +151,91 @@ def _normalize_title(s):
def _find_best_release ( album_name , artist_name , track_count , mb_service ) :
""" Search MusicBrainz for the best release matching this album.
Prefers releases where track count matches the download . """
Uses Picard - style preference scoring : track count match , release status ,
country ( US / worldwide preferred ) , format ( Digital / CD preferred ) , barcode
presence , and date completeness . Deterministic — same inputs always
produce the same release .
"""
try :
# First try our existing match_release (uses version qualifier scoring)
match = mb_service . match_release ( album_name , artist_name )
if not match or not match . get ( ' mbid ' ) :
# Try stripping edition qualifiers — Spotify uses "Album (Super Deluxe)"
# but MusicBrainz just calls it "Album"
import re
stripped = re . sub (
r ' \ s*[ \ ( \ [] '
r ' [^) \ ]]* '
r ' (?:deluxe|expanded|remaster(?:ed)?|anniversary|special|collector| '
r ' limited|bonus|platinum|gold|super \ s*deluxe|standard|edition) '
r ' [^) \ ]]* '
r ' [ \ ) \ ]] ' ,
' ' , album_name , flags = re . IGNORECASE
) . strip ( )
# Also strip trailing bare editions
stripped = re . sub (
r ' \ s+(?:- \ s+)?(?:deluxe|expanded|remaster(?:ed)?|anniversary|special|collector| '
r ' limited|bonus|platinum|gold|super \ s*deluxe|standard) '
r ' (?: \ s+(?:edition|version))? \ s*$ ' ,
' ' , stripped , flags = re . IGNORECASE
) . strip ( )
if stripped and stripped . lower ( ) != album_name . lower ( ) :
logger . info ( f " Retrying MB search with stripped name: ' { stripped } ' (was ' { album_name } ' ) " )
match = mb_service . match_release ( stripped , artist_name )
if not match or not match . get ( ' mbid ' ) :
# Final fallback: direct API search with stripped name
search_name = stripped or album_name
logger . info ( f " No cached MB release — trying direct search for ' { search_name } ' " )
search_results = mb_service . mb_client . search_release ( search_name , artist_name , limit = 5 )
if not search_results :
logger . info ( f " No MB release found for ' { album_name } ' by ' { artist_name } ' " )
return None
mbid = search_results [ 0 ] . get ( ' id ' , ' ' )
if not mbid :
return None
logger . info ( f " Direct search found: { search_results [ 0 ] . get ( ' title ' , ' ' ) } ( { mbid [ : 8 ] } ...) " )
else :
mbid = match [ ' mbid ' ]
else :
mbid = match [ ' mbid ' ]
import re
# Build search name variants
search_names = [ album_name ]
stripped = re . sub (
r ' \ s*[ \ ( \ [] '
r ' [^) \ ]]* '
r ' (?:deluxe|expanded|remaster(?:ed)?|anniversary|special|collector| '
r ' limited|bonus|platinum|gold|super \ s*deluxe|standard|edition) '
r ' [^) \ ]]* '
r ' [ \ ) \ ]] ' ,
' ' , album_name , flags = re . IGNORECASE
) . strip ( )
stripped = re . sub (
r ' \ s+(?:- \ s+)?(?:deluxe|expanded|remaster(?:ed)?|anniversary|special|collector| '
r ' limited|bonus|platinum|gold|super \ s*deluxe|standard) '
r ' (?: \ s+(?:edition|version))? \ s*$ ' ,
' ' , stripped , flags = re . IGNORECASE
) . strip ( )
if stripped and stripped . lower ( ) != album_name . lower ( ) :
search_names . append ( stripped )
# Collect candidate release MBIDs from all search variants
candidate_mbids = [ ]
for name in search_names :
# Try cached match first
match = mb_service . match_release ( name , artist_name )
if match and match . get ( ' mbid ' ) :
candidate_mbids . append ( match [ ' mbid ' ] )
# Also try direct search for more candidates
try :
search_results = mb_service . mb_client . search_release ( name , artist_name , limit = 5 )
for sr in ( search_results or [ ] ) :
sr_id = sr . get ( ' id ' , ' ' )
if sr_id and sr_id not in candidate_mbids :
candidate_mbids . append ( sr_id )
except Exception :
pass
# Fetch full release with tracklist
release = mb_service . mb_client . get_release (
mbid , includes = [ ' recordings ' , ' release-groups ' , ' labels ' , ' media ' , ' artist-credits ' ]
)
if not release :
if not candidate_mbids :
logger . info ( f " No MB release found for ' { album_name } ' by ' { artist_name } ' " )
return None
# Check track count match
mb_track_count = sum ( len ( m . get ( ' tracks ' ) or m . get ( ' track-list ' , [ ] ) ) for m in release . get ( ' media ' , [ ] ) )
if abs ( mb_track_count - track_count ) < = 2 :
logger . info ( f " Accepted release ' { release . get ( ' title ' ) } ' ( { mbid [ : 8 ] } ...) — "
f " { mb_track_count } tracks (downloaded { track_count } ) " )
return release
# Fetch full release data for each candidate and score them
best_release = None
best_score = - 1
# Track count mismatch — try searching for a better release
# Use stripped name for search (MB often doesn't include edition suffixes)
import re
_search_name = re . sub (
r ' \ s*[ \ ( \ [][^) \ ]]*(?:deluxe|expanded|remaster|anniversary|special|collector| '
r ' limited|bonus|platinum|gold|super \ s*deluxe|standard|edition)[^) \ ]]*[ \ ) \ ]] ' ,
' ' , album_name , flags = re . IGNORECASE
) . strip ( ) or album_name
logger . info ( f " Release ' { release . get ( ' title ' ) } ' has { mb_track_count } tracks but we have { track_count } — "
f " searching for better match with ' { _search_name } ' " )
search_results = mb_service . mb_client . search_release ( _search_name , artist_name , limit = 5 )
if not search_results :
# Fall back to the first match even if count doesn't match perfectly
return release
best_release = release
best_diff = abs ( mb_track_count - track_count )
for sr in search_results :
sr_mbid = sr . get ( ' id ' , ' ' )
if not sr_mbid or sr_mbid == mbid :
continue
for mbid in candidate_mbids [ : 8 ] : # Cap at 8 to limit API calls
try :
candidate = mb_service . mb_client . get_release (
sr_mbid , includes = [ ' recordings ' , ' release-groups ' , ' labels ' , ' media ' , ' artist-credits ' ]
release = mb_service . mb_client . get_release (
mbid , includes = [ ' recordings ' , ' release-groups ' , ' labels ' ,
' media ' , ' artist-credits ' ]
)
if not candidat e:
if not release :
continue
cand_count = sum ( len ( m . get ( ' tracks ' ) or m . get ( ' track-list ' , [ ] ) ) for m in candidate . get ( ' media ' , [ ] ) )
cand_diff = abs ( cand_count - track_count )
if cand_diff < best_diff :
best_diff = cand_diff
best_ release = candidat e
if cand_diff == 0 :
break # Perfect match
score = _score_release ( release , track_count )
if score > best_score :
best_score = score
best_release = release
except Exception :
continue
logger . info ( f " Best release: ' { best_release . get ( ' title ' ) } ' ( { best_release . get ( ' id ' , ' ' ) [ : 8 ] } ...) — "
f " track count diff: { best_diff } " )
if best_release :
mb_count = sum ( len ( m . get ( ' tracks ' ) or m . get ( ' track-list ' , [ ] ) )
for m in best_release . get ( ' media ' , [ ] ) )
logger . info (
f " Selected release ' { best_release . get ( ' title ' ) } ' "
f " ( { best_release . get ( ' id ' , ' ' ) [ : 8 ] } ...) — "
f " score= { best_score : .0f } , tracks= { mb_count } , "
f " country= { best_release . get ( ' country ' , ' ? ' ) } , "
f " format= { best_release . get ( ' media ' , [ { } ] ) [ 0 ] . get ( ' format ' , ' ? ' ) } , "
f " status= { best_release . get ( ' status ' , ' ? ' ) } "
)
return best_release
except Exception as e :