You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
SoulSync/core/album_consistency.py

516 lines
18 KiB

"""Picard-style Album Consistency — after all tracks in an album batch finish
post-processing, pick ONE MusicBrainz release and overwrite album-level tags
on every file so they're consistent. Prevents media server album splits.
"""
import os
import threading
from difflib import SequenceMatcher
from typing import Any, Dict, List, Optional
from mutagen import File as MutagenFile
from mutagen.flac import FLAC
from mutagen.id3 import ID3, TALB, TPE2, TXXX
from mutagen.mp4 import MP4, MP4FreeForm
from mutagen.oggvorbis import OggVorbis
from utils.logging_config import get_logger
logger = get_logger("album_consistency")
# Tags written to EVERY file (album-level, same value)
_ALBUM_LEVEL_TAGS = [
'MUSICBRAINZ_RELEASE_ID',
'MUSICBRAINZ_RELEASEGROUPID',
'MUSICBRAINZ_ALBUMARTISTID',
'RELEASETYPE',
'RELEASESTATUS',
'RELEASECOUNTRY',
'ORIGINALDATE',
'BARCODE',
'MEDIA',
'TOTALDISCS',
'CATALOGNUMBER',
'SCRIPT',
'ASIN',
]
# Vorbis comment keys (FLAC/OGG) — same as _ALBUM_LEVEL_TAGS (uppercase)
# ID3 TXXX desc mapping
_ID3_TXXX_MAP = {
'MUSICBRAINZ_RELEASE_ID': 'MusicBrainz Album Id',
'MUSICBRAINZ_RELEASEGROUPID': 'MusicBrainz Release Group Id',
'MUSICBRAINZ_ALBUMARTISTID': 'MusicBrainz Album Artist Id',
'MUSICBRAINZ_RELEASETRACKID': 'MusicBrainz Release Track Id',
'RELEASETYPE': 'MusicBrainz Album Type',
'RELEASESTATUS': 'MusicBrainz Album Status',
'RELEASECOUNTRY': 'MusicBrainz Album Release Country',
'ORIGINALDATE': 'ORIGINALDATE',
'BARCODE': 'BARCODE',
'MEDIA': 'MEDIA',
'TOTALDISCS': 'TOTALDISCS',
'CATALOGNUMBER': 'CATALOGNUMBER',
'SCRIPT': 'SCRIPT',
'ASIN': 'ASIN',
}
# MP4 freeform keys
_MP4_KEY_PREFIX = '----:com.apple.iTunes:'
# ── Picard-style release preference scoring ──
# Preferred countries (higher = better). US/GB/XW(worldwide) are most common
# for English-language music. XE = Europe-wide.
_COUNTRY_SCORES = {
'US': 10, 'XW': 10, 'GB': 8, 'XE': 7, 'CA': 6, 'AU': 5, 'DE': 4,
'FR': 4, 'JP': 3, 'NL': 3, 'SE': 3, 'IT': 2,
}
# Preferred formats (higher = better). Digital/CD are the standard;
# vinyl and cassette are niche reissues that often differ from the
# canonical tracklist.
_FORMAT_SCORES = {
'Digital Media': 10, 'CD': 9, 'Enhanced CD': 8,
'SACD': 7, 'Hybrid SACD': 7, 'Blu-spec CD': 7,
'Vinyl': 3, '12" Vinyl': 3, '7" Vinyl': 2,
'Cassette': 1,
}
# Release status preference
_STATUS_SCORES = {
'Official': 10, 'Promotion': 5, 'Bootleg': 1, 'Pseudo-Release': 1,
}
def _score_release(release: dict, expected_track_count: int) -> float:
"""Score a MusicBrainz release for preference ranking.
Higher score = better candidate. Factors:
- Track count match (most important — wrong count is wrong release)
- Release status (Official > Promo > Bootleg)
- Country preference (US/worldwide > regional)
- Format preference (Digital/CD > Vinyl > Cassette)
- Has barcode (sign of a real commercial release)
- Penalize releases with no media info (incomplete data)
"""
score = 0.0
# Track count match (0-40 points, biggest factor)
media = release.get('media', [])
mb_track_count = sum(len(m.get('tracks') or m.get('track-list', []))
for m in media)
track_diff = abs(mb_track_count - expected_track_count)
if track_diff == 0:
score += 40
elif track_diff <= 1:
score += 30
elif track_diff <= 2:
score += 20
elif track_diff <= 5:
score += 10
# else: 0 points
# Status (0-10 points)
status = release.get('status', '')
score += _STATUS_SCORES.get(status, 2)
# Country (0-10 points)
country = release.get('country', '')
score += _COUNTRY_SCORES.get(country, 1)
# Format from first medium (0-10 points)
if media:
fmt = media[0].get('format', '')
score += _FORMAT_SCORES.get(fmt, 4)
else:
score -= 5 # No media info = suspect
# Barcode (0-3 points) — real commercial releases have barcodes
if release.get('barcode'):
score += 3
# Date completeness (0-2 points) — prefer releases with full dates
date = release.get('date', '')
if len(date) >= 10:
score += 2 # Full YYYY-MM-DD
elif len(date) >= 4:
score += 1 # Year only
return score
def _normalize_title(s):
"""Normalize a title for comparison."""
import re
if not s:
return ''
s = s.lower().strip()
s = re.sub(r'\s*[\(\[].*?[\)\]]\s*', ' ', s) # Strip parentheticals/brackets
s = re.sub(r'[^\w\s]', '', s) # Strip punctuation
return ' '.join(s.split())
def _find_best_release(album_name, artist_name, track_count, mb_service):
"""Search MusicBrainz for the best release matching this album.
Uses Picard-style preference scoring: track count match, release status,
country (US/worldwide preferred), format (Digital/CD preferred), barcode
presence, and date completeness. Deterministic — same inputs always
produce the same release.
"""
try:
import re
# Build search name variants
search_names = [album_name]
stripped = re.sub(
r'\s*[\(\[]'
r'[^)\]]*'
r'(?:deluxe|expanded|remaster(?:ed)?|anniversary|special|collector|'
r'limited|bonus|platinum|gold|super\s*deluxe|standard|edition)'
r'[^)\]]*'
r'[\)\]]',
'', album_name, flags=re.IGNORECASE
).strip()
stripped = re.sub(
r'\s+(?:-\s+)?(?:deluxe|expanded|remaster(?:ed)?|anniversary|special|collector|'
r'limited|bonus|platinum|gold|super\s*deluxe|standard)'
r'(?:\s+(?:edition|version))?\s*$',
'', stripped, flags=re.IGNORECASE
).strip()
if stripped and stripped.lower() != album_name.lower():
search_names.append(stripped)
# Collect candidate release MBIDs from all search variants
candidate_mbids = []
for name in search_names:
# Try cached match first
match = mb_service.match_release(name, artist_name)
if match and match.get('mbid'):
candidate_mbids.append(match['mbid'])
# Also try direct search for more candidates
try:
search_results = mb_service.mb_client.search_release(name, artist_name, limit=5)
for sr in (search_results or []):
sr_id = sr.get('id', '')
if sr_id and sr_id not in candidate_mbids:
candidate_mbids.append(sr_id)
except Exception:
pass
if not candidate_mbids:
logger.info(f"No MB release found for '{album_name}' by '{artist_name}'")
return None
# Fetch full release data for each candidate and score them
best_release = None
best_score = -1
for mbid in candidate_mbids[:8]: # Cap at 8 to limit API calls
try:
release = mb_service.mb_client.get_release(
mbid, includes=['recordings', 'release-groups', 'labels',
'media', 'artist-credits']
)
if not release:
continue
score = _score_release(release, track_count)
if score > best_score:
best_score = score
best_release = release
except Exception:
continue
if best_release:
mb_count = sum(len(m.get('tracks') or m.get('track-list', []))
for m in best_release.get('media', []))
logger.info(
f"Selected release '{best_release.get('title')}' "
f"({best_release.get('id', '')[:8]}...) — "
f"score={best_score:.0f}, tracks={mb_count}, "
f"country={best_release.get('country', '?')}, "
f"format={best_release.get('media', [{}])[0].get('format', '?')}, "
f"status={best_release.get('status', '?')}"
)
return best_release
except Exception as e:
logger.error(f"Error finding best release for '{album_name}': {e}")
return None
def _match_files_to_tracklist(file_infos, release):
"""Match downloaded files to MB release tracklist entries.
Returns {file_path: mb_track_entry} for matched files."""
# Build MB tracklist lookup: (disc, track) -> track entry
mb_lookup = {}
for medium in release.get('media', []):
disc_num = medium.get('position', 1)
for track in (medium.get('tracks') or medium.get('track-list', [])):
pos = track.get('position', track.get('number', 0))
try:
pos = int(pos)
except (ValueError, TypeError):
continue
mb_lookup[(disc_num, pos)] = track
matched = {}
unmatched = []
# Pass 1: exact disc+track number match
for fi in file_infos:
key = (fi.get('disc_number', 1), fi.get('track_number', 1))
if key in mb_lookup:
matched[fi['path']] = mb_lookup[key]
else:
unmatched.append(fi)
# Pass 2: title similarity for unmatched
remaining_mb = {k: v for k, v in mb_lookup.items() if v not in matched.values()}
for fi in unmatched:
norm_title = _normalize_title(fi.get('title', ''))
best_score = 0
best_entry = None
for _key, mb_track in remaining_mb.items():
recording = mb_track.get('recording', {})
mb_title = _normalize_title(recording.get('title', ''))
if not mb_title:
continue
score = SequenceMatcher(None, norm_title, mb_title).ratio()
if score > best_score:
best_score = score
best_entry = mb_track
if best_entry and best_score >= 0.70:
matched[fi['path']] = best_entry
# Remove from remaining so it's not double-matched
remaining_mb = {k: v for k, v in remaining_mb.items() if v is not best_entry}
return matched
def _write_tag_to_file(audio, tag_key, value):
"""Write a single custom tag to an audio file (Mutagen object)."""
if value is None:
return
value = str(value)
try:
if isinstance(audio.tags, ID3):
desc = _ID3_TXXX_MAP.get(tag_key, tag_key)
# Remove existing TXXX with this desc
to_remove = [k for k in audio.tags if k.startswith('TXXX:') and desc in k]
for k in to_remove:
del audio.tags[k]
audio.tags.add(TXXX(encoding=3, desc=desc, text=[value]))
elif isinstance(audio, (FLAC, OggVorbis)):
audio[tag_key] = [value]
elif isinstance(audio, MP4):
key = _MP4_KEY_PREFIX + _ID3_TXXX_MAP.get(tag_key, tag_key)
audio[key] = [MP4FreeForm(value.encode('utf-8'))]
except Exception as e:
logger.debug(f"Failed to write {tag_key}: {e}")
def _write_standard_tag(audio, tag_name, value):
"""Write album/albumartist standard tags."""
if value is None:
return
try:
if isinstance(audio.tags, ID3):
if tag_name == 'album':
audio.tags.delall('TALB')
audio.tags.add(TALB(encoding=3, text=[value]))
elif tag_name == 'albumartist':
audio.tags.delall('TPE2')
audio.tags.add(TPE2(encoding=3, text=[value]))
elif isinstance(audio, (FLAC, OggVorbis)):
audio[tag_name.upper()] = [value]
elif isinstance(audio, MP4):
tag_map = {'album': '\xa9alb', 'albumartist': 'aART'}
key = tag_map.get(tag_name)
if key:
audio[key] = [value]
except Exception as e:
logger.debug(f"Failed to write standard tag {tag_name}: {e}")
def run_album_consistency(
file_infos: List[Dict[str, Any]],
album_name: str,
artist_name: str,
mb_service: Any,
total_discs: int = 1,
file_lock_fn=None,
) -> Dict[str, Any]:
"""
Picard-style album consistency: pick ONE MusicBrainz release for the album,
then overwrite album-level tags on all files to match.
Args:
file_infos: List of {path, track_number, disc_number, title}
album_name: Album name from download context
artist_name: Artist name from download context
mb_service: MusicBrainzService instance
total_discs: Number of discs in the album
file_lock_fn: Optional function(path) -> context manager for thread-safe writes
Returns:
{success, release_mbid, matched_tracks, total_files, tags_written, error}
"""
result = {
'success': False,
'release_mbid': None,
'matched_tracks': 0,
'total_files': len(file_infos),
'tags_written': 0,
'error': None,
}
if not file_infos:
result['error'] = 'No files provided'
return result
if not mb_service:
result['error'] = 'MusicBrainz service not available'
return result
# Step 1: Find the best release
release = _find_best_release(album_name, artist_name, len(file_infos), mb_service)
if not release:
result['error'] = f'No MusicBrainz release found for "{album_name}"'
return result
release_mbid = release.get('id', '')
result['release_mbid'] = release_mbid
# Step 2: Match files to tracklist
matched = _match_files_to_tracklist(file_infos, release)
result['matched_tracks'] = len(matched)
if len(matched) < len(file_infos) * 0.5:
result['error'] = (f'Only {len(matched)}/{len(file_infos)} tracks matched the release — '
f'aborting to avoid incorrect tagging')
return result
# Step 3: Build album-level tags (same for all files)
album_tags = {}
album_tags['MUSICBRAINZ_RELEASE_ID'] = release_mbid
rg = release.get('release-group', {})
if rg.get('id'):
album_tags['MUSICBRAINZ_RELEASEGROUPID'] = rg['id']
if rg.get('primary-type'):
album_tags['RELEASETYPE'] = rg['primary-type']
if rg.get('first-release-date'):
album_tags['ORIGINALDATE'] = rg['first-release-date']
ac = release.get('artist-credit', [])
if ac and isinstance(ac[0], dict):
aa = ac[0].get('artist', {})
if aa.get('id'):
album_tags['MUSICBRAINZ_ALBUMARTISTID'] = aa['id']
if release.get('status'):
album_tags['RELEASESTATUS'] = release['status']
if release.get('country'):
album_tags['RELEASECOUNTRY'] = release['country']
if release.get('barcode'):
album_tags['BARCODE'] = release['barcode']
media_list = release.get('media', [])
if media_list:
fmt = media_list[0].get('format', '')
if fmt:
album_tags['MEDIA'] = fmt
album_tags['TOTALDISCS'] = str(len(media_list))
label_info = release.get('label-info', [])
if label_info and isinstance(label_info[0], dict):
cat = label_info[0].get('catalog-number', '')
if cat:
album_tags['CATALOGNUMBER'] = cat
text_rep = release.get('text-representation', {})
if isinstance(text_rep, dict) and text_rep.get('script'):
album_tags['SCRIPT'] = text_rep['script']
if release.get('asin'):
album_tags['ASIN'] = release['asin']
# Album name and artist from the release (canonical MB values)
release_album_name = release.get('title', album_name)
release_artist_name = artist_name
if ac:
# Build full artist credit string
parts = []
for credit in ac:
if isinstance(credit, dict):
parts.append(credit.get('artist', {}).get('name', ''))
parts.append(credit.get('joinphrase', ''))
elif isinstance(credit, str):
parts.append(credit)
full_credit = ''.join(parts).strip()
if full_credit:
release_artist_name = full_credit
# Step 4: Write tags to matched files only (unmatched files keep their existing tags)
tags_written = 0
for fi in file_infos:
file_path = fi['path']
mb_track = matched.get(file_path)
# Only write to files that matched the tracklist — avoids corrupting
# bonus tracks or files from a different edition
if not mb_track:
continue
if not os.path.exists(file_path):
continue
try:
if file_lock_fn:
lock = file_lock_fn(file_path)
else:
lock = _DummyLock()
with lock:
audio = MutagenFile(file_path, easy=False)
if audio is None:
continue
# Write album-level tags
for tag_key, value in album_tags.items():
_write_tag_to_file(audio, tag_key, value)
# Write standard album/albumartist tags
_write_standard_tag(audio, 'album', release_album_name)
_write_standard_tag(audio, 'albumartist', release_artist_name)
# Write per-track tag (release track ID) if matched
if mb_track and mb_track.get('id'):
_write_tag_to_file(audio, 'MUSICBRAINZ_RELEASETRACKID', mb_track['id'])
audio.save()
tags_written += 1
except Exception as e:
logger.error(f"Error writing consistency tags to {file_path}: {e}")
result['tags_written'] = tags_written
result['success'] = tags_written > 0
logger.info(f"Album consistency complete: {tags_written}/{len(file_infos)} files tagged "
f"with release '{release_album_name}' ({release_mbid[:8]}...)")
return result
class _DummyLock:
"""No-op context manager when no file lock is provided."""
def __enter__(self):
return self
def __exit__(self, *args):
pass