|
|
|
|
@ -14088,7 +14088,7 @@ def _get_file_path_from_template(context: dict, template_type: str = 'album_path
|
|
|
|
|
# METADATA & COVER ART HELPERS (Ported from downloads.py)
|
|
|
|
|
# ===================================================================
|
|
|
|
|
from mutagen import File as MutagenFile
|
|
|
|
|
from mutagen.id3 import ID3, TIT2, TPE1, TALB, TDRC, TRCK, TCON, TPE2, TPOS, TXXX, APIC, UFID, TSRC, TBPM
|
|
|
|
|
from mutagen.id3 import ID3, TIT2, TPE1, TALB, TDRC, TRCK, TCON, TPE2, TPOS, TXXX, APIC, UFID, TSRC, TBPM, TCOP, TPUB
|
|
|
|
|
from mutagen.flac import FLAC, Picture
|
|
|
|
|
from mutagen.mp4 import MP4, MP4Cover, MP4FreeForm
|
|
|
|
|
from mutagen.oggvorbis import OggVorbis
|
|
|
|
|
@ -14510,10 +14510,13 @@ def _embed_album_art_metadata(audio_file, metadata: dict):
|
|
|
|
|
|
|
|
|
|
def _embed_source_ids(audio_file, metadata: dict):
|
|
|
|
|
"""
|
|
|
|
|
Lookup MusicBrainz, Deezer, and AudioDB metadata, then embed them along
|
|
|
|
|
with Spotify/iTunes source IDs as custom tags into the audio file.
|
|
|
|
|
Lookup MusicBrainz, Deezer, AudioDB, Tidal, Qobuz, Last.fm, and Genius
|
|
|
|
|
metadata, then embed them along with Spotify/iTunes source IDs as custom
|
|
|
|
|
tags into the audio file.
|
|
|
|
|
Tags written: source IDs, BPM (Deezer), mood/style (AudioDB), ISRC
|
|
|
|
|
(MB→Deezer fallback), and merged genres (Spotify+MB+AudioDB).
|
|
|
|
|
(MB→Deezer→Tidal→Qobuz fallback), copyright (Tidal→Qobuz),
|
|
|
|
|
label (Qobuz), URLs (Last.fm/Genius),
|
|
|
|
|
and merged genres (Spotify+MB+AudioDB+Last.fm).
|
|
|
|
|
One file write, one shot. Concurrent calls are safe — each service has
|
|
|
|
|
its own global rate limiter.
|
|
|
|
|
Operates on a non-easy-mode MutagenFile object (caller must save).
|
|
|
|
|
@ -14692,6 +14695,125 @@ def _embed_source_ids(audio_file, metadata: dict):
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"⚠️ AudioDB lookup failed (non-fatal): {e}")
|
|
|
|
|
|
|
|
|
|
# ── 2d. Tidal lookup for ISRC fallback, copyright, and source IDs ──
|
|
|
|
|
tidal_isrc = None
|
|
|
|
|
tidal_copyright = None
|
|
|
|
|
if track_title and artist_name:
|
|
|
|
|
try:
|
|
|
|
|
if tidal_client and tidal_client.is_authenticated():
|
|
|
|
|
td_result = tidal_client.search_track(artist_name, track_title)
|
|
|
|
|
if td_result and _names_match(td_result.get('title', ''), track_title):
|
|
|
|
|
td_track_id = td_result.get('id')
|
|
|
|
|
if td_track_id:
|
|
|
|
|
id_tags['TIDAL_TRACK_ID'] = str(td_track_id)
|
|
|
|
|
print(f"🎵 Tidal track matched: {td_track_id}")
|
|
|
|
|
td_artist = td_result.get('artist', {})
|
|
|
|
|
if isinstance(td_artist, dict) and td_artist.get('id'):
|
|
|
|
|
id_tags['TIDAL_ARTIST_ID'] = str(td_artist['id'])
|
|
|
|
|
# Get full details for ISRC and copyright
|
|
|
|
|
if td_track_id:
|
|
|
|
|
td_details = tidal_client.get_track(str(td_track_id))
|
|
|
|
|
if td_details:
|
|
|
|
|
td_isrc = td_details.get('isrc')
|
|
|
|
|
if td_isrc:
|
|
|
|
|
tidal_isrc = td_isrc
|
|
|
|
|
td_copyright = td_details.get('copyright')
|
|
|
|
|
if isinstance(td_copyright, dict):
|
|
|
|
|
td_copyright = td_copyright.get('text', td_copyright.get('name', ''))
|
|
|
|
|
if td_copyright:
|
|
|
|
|
tidal_copyright = td_copyright
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"⚠️ Tidal lookup failed (non-fatal): {e}")
|
|
|
|
|
|
|
|
|
|
# ── 2e. Qobuz lookup for ISRC fallback, copyright, label, and source IDs ──
|
|
|
|
|
qobuz_isrc = None
|
|
|
|
|
qobuz_copyright = None
|
|
|
|
|
qobuz_label = None
|
|
|
|
|
if track_title and artist_name:
|
|
|
|
|
try:
|
|
|
|
|
qz_client = qobuz_enrichment_worker.client if qobuz_enrichment_worker else None
|
|
|
|
|
if qz_client and qz_client.is_authenticated():
|
|
|
|
|
qz_result = qz_client.search_track(artist_name, track_title)
|
|
|
|
|
if qz_result:
|
|
|
|
|
qz_performer = (qz_result.get('performer') or {})
|
|
|
|
|
if not isinstance(qz_performer, dict):
|
|
|
|
|
qz_performer = {}
|
|
|
|
|
qz_artist_name = qz_performer.get('name', '')
|
|
|
|
|
if _names_match(qz_result.get('title', ''), track_title) and \
|
|
|
|
|
_names_match(qz_artist_name, artist_name):
|
|
|
|
|
qz_track_id = qz_result.get('id')
|
|
|
|
|
if qz_track_id:
|
|
|
|
|
id_tags['QOBUZ_TRACK_ID'] = str(qz_track_id)
|
|
|
|
|
print(f"🎵 Qobuz track matched: {qz_track_id}")
|
|
|
|
|
if isinstance(qz_performer, dict) and qz_performer.get('id'):
|
|
|
|
|
id_tags['QOBUZ_ARTIST_ID'] = str(qz_performer['id'])
|
|
|
|
|
qz_isrc = qz_result.get('isrc')
|
|
|
|
|
if isinstance(qz_isrc, dict):
|
|
|
|
|
qz_isrc = qz_isrc.get('value', qz_isrc.get('id', ''))
|
|
|
|
|
if qz_isrc:
|
|
|
|
|
qobuz_isrc = qz_isrc
|
|
|
|
|
qz_copyright = qz_result.get('copyright')
|
|
|
|
|
if isinstance(qz_copyright, dict):
|
|
|
|
|
qz_copyright = qz_copyright.get('text', qz_copyright.get('name', ''))
|
|
|
|
|
if qz_copyright and isinstance(qz_copyright, str):
|
|
|
|
|
qobuz_copyright = qz_copyright
|
|
|
|
|
qz_album = qz_result.get('album', {})
|
|
|
|
|
if isinstance(qz_album, dict):
|
|
|
|
|
qz_label_info = qz_album.get('label', {})
|
|
|
|
|
if isinstance(qz_label_info, dict) and qz_label_info.get('name'):
|
|
|
|
|
qobuz_label = qz_label_info['name']
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"⚠️ Qobuz lookup failed (non-fatal): {e}")
|
|
|
|
|
|
|
|
|
|
# ── 2f. Last.fm lookup for tags (genre merge) and URL ──
|
|
|
|
|
lastfm_tags = []
|
|
|
|
|
lastfm_url = None
|
|
|
|
|
if track_title and artist_name:
|
|
|
|
|
try:
|
|
|
|
|
lf_client = lastfm_worker.client if lastfm_worker else None
|
|
|
|
|
if lf_client:
|
|
|
|
|
lf_result = lf_client.get_track_info(artist_name, track_title)
|
|
|
|
|
if lf_result:
|
|
|
|
|
lf_url = lf_result.get('url')
|
|
|
|
|
if lf_url:
|
|
|
|
|
lastfm_url = lf_url
|
|
|
|
|
lf_toptags = lf_result.get('toptags', {})
|
|
|
|
|
if isinstance(lf_toptags, dict):
|
|
|
|
|
tag_list = lf_toptags.get('tag', [])
|
|
|
|
|
if isinstance(tag_list, list):
|
|
|
|
|
lastfm_tags = [t.get('name', '') for t in tag_list if isinstance(t, dict) and t.get('name')]
|
|
|
|
|
elif isinstance(tag_list, dict) and tag_list.get('name'):
|
|
|
|
|
lastfm_tags = [tag_list['name']]
|
|
|
|
|
print(f"🎵 Last.fm track info found: {len(lastfm_tags)} tags")
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"⚠️ Last.fm lookup failed (non-fatal): {e}")
|
|
|
|
|
|
|
|
|
|
# ── 2g. Genius lookup for source ID and URL ──
|
|
|
|
|
# Genius has an aggressive global rate limiter (30→60→120s backoff) that
|
|
|
|
|
# blocks ALL callers including post-processing. We check the backoff
|
|
|
|
|
# state directly and skip immediately if Genius is rate-limited, rather
|
|
|
|
|
# than entering search_song which would sleep for up to 120s.
|
|
|
|
|
genius_url = None
|
|
|
|
|
if track_title and artist_name:
|
|
|
|
|
try:
|
|
|
|
|
import core.genius_client as _genius_module
|
|
|
|
|
if time.time() < _genius_module._rate_limit_until:
|
|
|
|
|
print("⏭️ Genius rate-limited, skipping (non-blocking)")
|
|
|
|
|
else:
|
|
|
|
|
g_client = genius_worker.client if genius_worker else None
|
|
|
|
|
if g_client:
|
|
|
|
|
g_result = g_client.search_song(artist_name, track_title)
|
|
|
|
|
if g_result:
|
|
|
|
|
g_id = g_result.get('id')
|
|
|
|
|
if g_id:
|
|
|
|
|
id_tags['GENIUS_TRACK_ID'] = str(g_id)
|
|
|
|
|
print(f"🎵 Genius song matched: {g_id}")
|
|
|
|
|
g_url = g_result.get('url')
|
|
|
|
|
if g_url:
|
|
|
|
|
genius_url = g_url
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"⚠️ Genius lookup failed (non-fatal): {e}")
|
|
|
|
|
|
|
|
|
|
if not id_tags and not deezer_bpm and not deezer_isrc and not audiodb_mood and not audiodb_style:
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
@ -14778,8 +14900,8 @@ def _embed_source_ids(audio_file, metadata: dict):
|
|
|
|
|
audio_file['----:com.apple.iTunes:STYLE'] = [MP4FreeForm(audiodb_style.encode('utf-8'))]
|
|
|
|
|
print(f"🎨 Style: {audiodb_style}")
|
|
|
|
|
|
|
|
|
|
# ── 4. Merge genres (Spotify + MusicBrainz + AudioDB) and overwrite tag ──
|
|
|
|
|
enrichment_genres = mb_genres + ([audiodb_genre] if audiodb_genre else [])
|
|
|
|
|
# ── 4. Merge genres (Spotify + MusicBrainz + AudioDB + Last.fm) and overwrite tag ──
|
|
|
|
|
enrichment_genres = mb_genres + ([audiodb_genre] if audiodb_genre else []) + lastfm_tags
|
|
|
|
|
if enrichment_genres:
|
|
|
|
|
spotify_genres = [g.strip() for g in metadata.get('genre', '').split(',') if g.strip()]
|
|
|
|
|
seen = set()
|
|
|
|
|
@ -14802,8 +14924,8 @@ def _embed_source_ids(audio_file, metadata: dict):
|
|
|
|
|
audio_file['\xa9gen'] = [genre_string]
|
|
|
|
|
print(f"🎶 Genres merged: {genre_string}")
|
|
|
|
|
|
|
|
|
|
# ── 5. Write ISRC if available (MusicBrainz first, Deezer fallback) ──
|
|
|
|
|
final_isrc = isrc or deezer_isrc
|
|
|
|
|
# ── 5. Write ISRC if available (MusicBrainz → Deezer → Tidal → Qobuz fallback) ──
|
|
|
|
|
final_isrc = isrc or deezer_isrc or tidal_isrc or qobuz_isrc
|
|
|
|
|
if final_isrc:
|
|
|
|
|
if isinstance(audio_file.tags, ID3):
|
|
|
|
|
audio_file.tags.add(TSRC(encoding=3, text=[final_isrc]))
|
|
|
|
|
@ -14811,9 +14933,48 @@ def _embed_source_ids(audio_file, metadata: dict):
|
|
|
|
|
audio_file['ISRC'] = [final_isrc]
|
|
|
|
|
elif isinstance(audio_file, MP4):
|
|
|
|
|
audio_file['----:com.apple.iTunes:ISRC'] = [MP4FreeForm(final_isrc.encode('utf-8'))]
|
|
|
|
|
source = "MusicBrainz" if isrc else "Deezer"
|
|
|
|
|
source = "MusicBrainz" if isrc else "Deezer" if deezer_isrc else "Tidal" if tidal_isrc else "Qobuz"
|
|
|
|
|
print(f"🔖 ISRC ({source}): {final_isrc}")
|
|
|
|
|
|
|
|
|
|
# ── 6. Write copyright tag (Tidal → Qobuz fallback) ──
|
|
|
|
|
final_copyright = tidal_copyright or qobuz_copyright
|
|
|
|
|
if final_copyright:
|
|
|
|
|
if isinstance(audio_file.tags, ID3):
|
|
|
|
|
audio_file.tags.add(TCOP(encoding=3, text=[final_copyright]))
|
|
|
|
|
elif isinstance(audio_file, (FLAC, OggVorbis)):
|
|
|
|
|
audio_file['COPYRIGHT'] = [final_copyright]
|
|
|
|
|
elif isinstance(audio_file, MP4):
|
|
|
|
|
audio_file['cprt'] = [final_copyright]
|
|
|
|
|
source = "Tidal" if tidal_copyright else "Qobuz"
|
|
|
|
|
print(f"©️ Copyright ({source}): {final_copyright[:60]}")
|
|
|
|
|
|
|
|
|
|
# ── 7. Write label/publisher tag (from Qobuz) ──
|
|
|
|
|
if qobuz_label:
|
|
|
|
|
if isinstance(audio_file.tags, ID3):
|
|
|
|
|
audio_file.tags.add(TPUB(encoding=3, text=[qobuz_label]))
|
|
|
|
|
elif isinstance(audio_file, (FLAC, OggVorbis)):
|
|
|
|
|
audio_file['LABEL'] = [qobuz_label]
|
|
|
|
|
elif isinstance(audio_file, MP4):
|
|
|
|
|
audio_file['----:com.apple.iTunes:LABEL'] = [MP4FreeForm(qobuz_label.encode('utf-8'))]
|
|
|
|
|
print(f"🏷️ Label (Qobuz): {qobuz_label}")
|
|
|
|
|
|
|
|
|
|
# ── 8. Write Last.fm and Genius URLs as custom tags ──
|
|
|
|
|
if lastfm_url:
|
|
|
|
|
if isinstance(audio_file.tags, ID3):
|
|
|
|
|
audio_file.tags.add(TXXX(encoding=3, desc='LASTFM_URL', text=[lastfm_url]))
|
|
|
|
|
elif isinstance(audio_file, (FLAC, OggVorbis)):
|
|
|
|
|
audio_file['LASTFM_URL'] = [lastfm_url]
|
|
|
|
|
elif isinstance(audio_file, MP4):
|
|
|
|
|
audio_file['----:com.apple.iTunes:LASTFM_URL'] = [MP4FreeForm(lastfm_url.encode('utf-8'))]
|
|
|
|
|
|
|
|
|
|
if genius_url:
|
|
|
|
|
if isinstance(audio_file.tags, ID3):
|
|
|
|
|
audio_file.tags.add(TXXX(encoding=3, desc='GENIUS_URL', text=[genius_url]))
|
|
|
|
|
elif isinstance(audio_file, (FLAC, OggVorbis)):
|
|
|
|
|
audio_file['GENIUS_URL'] = [genius_url]
|
|
|
|
|
elif isinstance(audio_file, MP4):
|
|
|
|
|
audio_file['----:com.apple.iTunes:GENIUS_URL'] = [MP4FreeForm(genius_url.encode('utf-8'))]
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"⚠️ Error embedding source IDs (non-fatal): {e}")
|
|
|
|
|
|
|
|
|
|
|