"""Source metadata extraction and source-ID embedding helpers.""" from __future__ import annotations import re import socket import threading import time from collections import OrderedDict from typing import Any, Dict import requests from core.imports.context import ( extract_artist_name, get_import_clean_artist, get_import_clean_title, get_import_context_album, get_import_original_search, get_import_source, get_import_source_ids, get_import_track_info, get_source_tag_names, normalize_import_context, ) from core.metadata.artist_resolution import resolve_track_artists from core.metadata.registry import get_itunes_client from database.music_database import get_database from core.metadata.common import ( get_config_manager, get_mutagen_symbols, is_vorbis_like, ) from utils.logging_config import get_logger as _create_logger __all__ = [ "extract_source_metadata", "embed_source_ids", "normalize_album_cache_key", "mb_release_cache", "mb_release_cache_lock", "mb_release_detail_cache", "mb_release_detail_cache_lock", ] _MB_RELEASE_CACHE_MAX_ENTRIES = 4096 _MB_RELEASE_DETAIL_CACHE_MAX_ENTRIES = 4096 mb_release_cache: "OrderedDict[tuple, str]" = OrderedDict() mb_release_cache_lock = threading.RLock() mb_release_detail_cache: "OrderedDict[str, Dict[str, Any]]" = OrderedDict() mb_release_detail_cache_lock = threading.RLock() logger = _create_logger("metadata.source") _SOURCE_NETWORK_EXCEPTIONS = (requests.RequestException, socket.timeout, TimeoutError) _EDITION_PAREN_RE = re.compile( r'\s*[\(\[]\s*(?:deluxe|expanded|remaster(?:ed)?|anniversary|special|collector|' r'limited|bonus|platinum|gold|super\s*deluxe|standard)' r'(?:\s+(?:edition|version))?[^)\]]*[\)\]]', re.IGNORECASE, ) _EDITION_BARE_RE = re.compile( r'\s+(?:-\s+)?(?:deluxe|expanded|remaster(?:ed)?|anniversary|special|collector|' r'limited|bonus|platinum|gold|super\s*deluxe|standard)' r'(?:\s+(?:edition|version))?\s*$', re.IGNORECASE, ) def normalize_album_cache_key(album_name: str) -> str: result = _EDITION_PAREN_RE.sub("", album_name or "") result = _EDITION_BARE_RE.sub("", result) return result.lower().strip() def _bounded_cache_get(cache, key): value = cache.get(key) if value is not None and hasattr(cache, "move_to_end"): cache.move_to_end(key) return value def _bounded_cache_set(cache, key, value, max_entries: int) -> None: cache[key] = value if hasattr(cache, "move_to_end"): cache.move_to_end(key) while len(cache) > max_entries: cache.popitem(last=False) def _call_source_lookup(label: str, func, *args, **kwargs): try: return func(*args, **kwargs) except _SOURCE_NETWORK_EXCEPTIONS as exc: logger.warning("%s lookup failed (network): %s", label, exc) return None SOURCE_TAG_CONFIG = { "SPOTIFY_TRACK_ID": "spotify.tags.track_id", "SPOTIFY_ARTIST_ID": "spotify.tags.artist_id", "SPOTIFY_ALBUM_ID": "spotify.tags.album_id", "ITUNES_TRACK_ID": "itunes.tags.track_id", "ITUNES_ARTIST_ID": "itunes.tags.artist_id", "ITUNES_ALBUM_ID": "itunes.tags.album_id", "MUSICBRAINZ_RECORDING_ID": "musicbrainz.tags.recording_id", "MUSICBRAINZ_ARTIST_ID": "musicbrainz.tags.artist_id", "MUSICBRAINZ_RELEASE_ID": "musicbrainz.tags.release_id", "MUSICBRAINZ_RELEASEGROUPID": "musicbrainz.tags.release_group_id", "MUSICBRAINZ_ALBUMARTISTID": "musicbrainz.tags.album_artist_id", "MUSICBRAINZ_RELEASETRACKID": "musicbrainz.tags.release_track_id", "RELEASETYPE": "musicbrainz.tags.release_type", "ORIGINALDATE": "musicbrainz.tags.original_date", "RELEASESTATUS": "musicbrainz.tags.release_status", "RELEASECOUNTRY": "musicbrainz.tags.release_country", "BARCODE": "musicbrainz.tags.barcode", "MEDIA": "musicbrainz.tags.media", "TOTALDISCS": "musicbrainz.tags.total_discs", "CATALOGNUMBER": "musicbrainz.tags.catalog_number", "SCRIPT": "musicbrainz.tags.script", "ASIN": "musicbrainz.tags.asin", "DEEZER_TRACK_ID": "deezer.tags.track_id", "DEEZER_ARTIST_ID": "deezer.tags.artist_id", "AUDIODB_TRACK_ID": "audiodb.tags.track_id", "TIDAL_TRACK_ID": "tidal.tags.track_id", "TIDAL_ARTIST_ID": "tidal.tags.artist_id", "HIFI_TRACK_ID": "hifi.tags.track_id", "HIFI_ARTIST_ID": "hifi.tags.artist_id", "QOBUZ_TRACK_ID": "qobuz.tags.track_id", "QOBUZ_ARTIST_ID": "qobuz.tags.artist_id", "GENIUS_TRACK_ID": "genius.tags.track_id", } DEFAULT_SOURCE_ORDER = ["musicbrainz", "deezer", "audiodb", "tidal", "hifi", "qobuz", "lastfm", "genius"] ID3_TAG_MAP = { "MUSICBRAINZ_RECORDING_ID": ("UFID", "http://musicbrainz.org"), "MUSICBRAINZ_ARTIST_ID": ("TXXX", "MusicBrainz Artist Id"), "MUSICBRAINZ_RELEASE_ID": ("TXXX", "MusicBrainz Album Id"), "MUSICBRAINZ_RELEASEGROUPID": ("TXXX", "MusicBrainz Release Group Id"), "MUSICBRAINZ_ALBUMARTISTID": ("TXXX", "MusicBrainz Album Artist Id"), "MUSICBRAINZ_RELEASETRACKID": ("TXXX", "MusicBrainz Release Track Id"), "RELEASETYPE": ("TXXX", "MusicBrainz Album Type"), "RELEASESTATUS": ("TXXX", "MusicBrainz Album Status"), "RELEASECOUNTRY": ("TXXX", "MusicBrainz Album Release Country"), "ORIGINALDATE": ("TDOR", None), "MEDIA": ("TMED", None), } VORBIS_TAG_MAP = { "MUSICBRAINZ_RECORDING_ID": "MUSICBRAINZ_TRACKID", "MUSICBRAINZ_ARTIST_ID": "MUSICBRAINZ_ARTISTID", "MUSICBRAINZ_RELEASE_ID": "MUSICBRAINZ_ALBUMID", "MUSICBRAINZ_RELEASEGROUPID": "MUSICBRAINZ_RELEASEGROUPID", "MUSICBRAINZ_ALBUMARTISTID": "MUSICBRAINZ_ALBUMARTISTID", "MUSICBRAINZ_RELEASETRACKID": "MUSICBRAINZ_RELEASETRACKID", } MP4_TAG_MAP = { "MUSICBRAINZ_RECORDING_ID": "MusicBrainz Track Id", "MUSICBRAINZ_ARTIST_ID": "MusicBrainz Artist Id", "MUSICBRAINZ_RELEASE_ID": "MusicBrainz Album Id", "MUSICBRAINZ_RELEASEGROUPID": "MusicBrainz Release Group Id", "MUSICBRAINZ_ALBUMARTISTID": "MusicBrainz Album Artist Id", "MUSICBRAINZ_RELEASETRACKID": "MusicBrainz Release Track Id", "RELEASETYPE": "MusicBrainz Album Type", "RELEASESTATUS": "MusicBrainz Album Status", "RELEASECOUNTRY": "MusicBrainz Album Release Country", } def _tag_enabled(cfg, path: str) -> bool: return cfg.get(path, True) is not False def _names_match(a: str, b: str, threshold: float = 0.75) -> bool: if not a or not b: return False from difflib import SequenceMatcher norm = lambda s: re.sub(r"[^a-z0-9 ]", "", re.sub(r"\(.*?\)", "", s).lower()).strip() return SequenceMatcher(None, norm(a), norm(b)).ratio() >= threshold def _normalize_release_date_tag(value: Any) -> str: """Return a tag-safe release date without inventing missing precision.""" raw = str(value or "").strip() if not raw: return "" # Source APIs commonly return ISO timestamps. Audio DATE/TDRC tags should # receive only the date precision the source actually provided. raw = raw.split("T", 1)[0].strip() match = re.match(r"^(\d{4})(?:-(\d{2})(?:-(\d{2}))?)?$", raw) if not match: return "" year, month, day = match.groups() if day: return f"{year}-{month}-{day}" if month: return f"{year}-{month}" return year def _collect_source_ids(metadata: dict, cfg) -> dict: source_ids = {} source = (metadata.get("source") or "").strip().lower() if source: source_tag_names = get_source_tag_names(source) source_track_id = metadata.get("source_track_id") source_artist_id = metadata.get("source_artist_id") source_album_id = metadata.get("source_album_id") if cfg.get(f"{source}.embed_tags", True) is not False: if source_tag_names.get("track") and source_track_id: source_ids[source_tag_names["track"]] = source_track_id if source_tag_names.get("artist") and source_artist_id: source_ids[source_tag_names["artist"]] = source_artist_id if source_tag_names.get("album") and source_album_id: source_ids[source_tag_names["album"]] = source_album_id if not source_ids: if cfg.get("spotify.embed_tags", True) is not False: if metadata.get("spotify_track_id"): source_ids["SPOTIFY_TRACK_ID"] = metadata["spotify_track_id"] if metadata.get("spotify_artist_id"): source_ids["SPOTIFY_ARTIST_ID"] = metadata["spotify_artist_id"] if metadata.get("spotify_album_id"): source_ids["SPOTIFY_ALBUM_ID"] = metadata["spotify_album_id"] if cfg.get("itunes.embed_tags", True) is not False: if metadata.get("itunes_track_id"): source_ids["ITUNES_TRACK_ID"] = metadata["itunes_track_id"] if metadata.get("itunes_artist_id"): source_ids["ITUNES_ARTIST_ID"] = metadata["itunes_artist_id"] if metadata.get("itunes_album_id"): source_ids["ITUNES_ALBUM_ID"] = metadata["itunes_album_id"] return source_ids def _process_musicbrainz_source(pp: dict, metadata: dict, cfg, runtime, track_title: str, artist_name: str) -> None: if cfg.get("musicbrainz.embed_tags", True) is False: return if not track_title or not artist_name: return mb_worker = getattr(runtime, "mb_worker", None) mb_service = mb_worker.mb_service if mb_worker else None if not mb_service: return result = _call_source_lookup("MusicBrainz recording", mb_service.match_recording, track_title, artist_name) if result and result.get("mbid"): pp["recording_mbid"] = result["mbid"] pp["id_tags"]["MUSICBRAINZ_RECORDING_ID"] = pp["recording_mbid"] details = _call_source_lookup( "MusicBrainz recording details", mb_service.mb_client.get_recording, pp["recording_mbid"], includes=["isrcs", "genres"], ) if details: isrcs = details.get("isrcs", []) if isrcs: pp["isrc"] = isrcs[0] pp["mb_genres"] = [g["name"] for g in sorted(details.get("genres", []), key=lambda x: x.get("count", 0), reverse=True)] track_artist_name = metadata.get("artist", "") or artist_name if ", " in track_artist_name: track_artist_name = track_artist_name.split(", ")[0] artist_result = _call_source_lookup("MusicBrainz artist", mb_service.match_artist, track_artist_name) if artist_result and artist_result.get("mbid"): pp["artist_mbid"] = artist_result["mbid"] pp["id_tags"]["MUSICBRAINZ_ARTIST_ID"] = pp["artist_mbid"] album_name_for_mb = metadata.get("album", "") if album_name_for_mb: artist_key = (pp.get("batch_artist_name") or artist_name).lower().strip() normalized_album_key = normalize_album_cache_key(album_name_for_mb) rc_key_norm = (normalized_album_key, artist_key) rc_key_exact = (album_name_for_mb.lower().strip(), artist_key) release_mbid = None with mb_release_cache_lock: cached = _bounded_cache_get(mb_release_cache, rc_key_norm) if cached is None: cached = _bounded_cache_get(mb_release_cache, rc_key_exact) if cached: release_mbid = cached else: # Persistent cache check BEFORE the live MB lookup. If a # previous SoulSync run already resolved this album's # release MBID, reuse it — guarantees every track of the # same album gets the SAME MUSICBRAINZ_ALBUMID tag, even # across server restarts and after the in-memory bounded # cache evicts the entry. Strictly additive: any failure # in the persistent lookup falls through to the live MB # query exactly as today. try: from core.metadata import album_mbid_cache as _persisted_cache persisted = _persisted_cache.lookup(normalized_album_key, artist_key) except Exception: persisted = None if persisted: release_mbid = persisted else: rc_result = _call_source_lookup("MusicBrainz release", mb_service.match_release, album_name_for_mb, artist_name) if rc_result and rc_result.get("mbid"): release_mbid = rc_result["mbid"] if release_mbid: _bounded_cache_set(mb_release_cache, rc_key_norm, release_mbid, _MB_RELEASE_CACHE_MAX_ENTRIES) _bounded_cache_set(mb_release_cache, rc_key_exact, release_mbid, _MB_RELEASE_CACHE_MAX_ENTRIES) # Also persist for future SoulSync runs. Defensive # try/except so a DB write failure can't block the # in-memory store + tag write that follow. try: from core.metadata import album_mbid_cache as _persisted_cache _persisted_cache.record(normalized_album_key, artist_key, release_mbid) except Exception as e: logger.debug("MBID cache persist failed: %s", e) pp["release_mbid"] = release_mbid or "" if pp["release_mbid"]: pp["id_tags"]["MUSICBRAINZ_RELEASE_ID"] = pp["release_mbid"] if pp["release_mbid"]: with mb_release_detail_cache_lock: release_detail = _bounded_cache_get(mb_release_detail_cache, pp["release_mbid"]) if release_detail is None: release_detail = _call_source_lookup( "MusicBrainz release details", mb_service.mb_client.get_release, pp["release_mbid"], includes=["release-groups", "labels", "media", "artist-credits", "recordings", "genres"], ) or {} with mb_release_detail_cache_lock: _bounded_cache_set(mb_release_detail_cache, pp["release_mbid"], release_detail, _MB_RELEASE_DETAIL_CACHE_MAX_ENTRIES) if release_detail: rg = release_detail.get("release-group", {}) if rg.get("id"): pp["id_tags"]["MUSICBRAINZ_RELEASEGROUPID"] = rg["id"] ac = release_detail.get("artist-credit", []) if ac and isinstance(ac[0], dict): aa = ac[0].get("artist", {}) if aa.get("id"): pp["id_tags"]["MUSICBRAINZ_ALBUMARTISTID"] = aa["id"] if rg.get("primary-type"): pp["id_tags"]["RELEASETYPE"] = rg["primary-type"] if rg.get("first-release-date"): pp["id_tags"]["ORIGINALDATE"] = rg["first-release-date"] if not pp["release_year"] and len(rg["first-release-date"]) >= 4: year = rg["first-release-date"][:4] if year.isdigit(): pp["release_year"] = year if release_detail.get("status"): pp["id_tags"]["RELEASESTATUS"] = release_detail["status"] if release_detail.get("country"): pp["id_tags"]["RELEASECOUNTRY"] = release_detail["country"] if release_detail.get("barcode"): pp["id_tags"]["BARCODE"] = release_detail["barcode"] media_list = release_detail.get("media", []) if media_list: fmt = media_list[0].get("format", "") if fmt: pp["id_tags"]["MEDIA"] = fmt pp["id_tags"]["TOTALDISCS"] = str(len(media_list)) label_info = release_detail.get("label-info", []) if label_info and isinstance(label_info[0], dict): cat = label_info[0].get("catalog-number", "") if cat: pp["id_tags"]["CATALOGNUMBER"] = cat text_rep = release_detail.get("text-representation", {}) if isinstance(text_rep, dict) and text_rep.get("script"): pp["id_tags"]["SCRIPT"] = text_rep["script"] if release_detail.get("asin"): pp["id_tags"]["ASIN"] = release_detail["asin"] track_num = metadata.get("track_number") disc_num = metadata.get("disc_number") or 1 if track_num and media_list: try: track_num_int = int(track_num) disc_num_int = int(disc_num) for medium in media_list: if medium.get("position", 1) == disc_num_int: for mtrack in (medium.get("tracks") or medium.get("track-list", [])): if mtrack.get("position") == track_num_int: if mtrack.get("id"): pp["id_tags"]["MUSICBRAINZ_RELEASETRACKID"] = mtrack["id"] release_recording = mtrack.get("recording", {}) if release_recording.get("id"): pp["recording_mbid"] = release_recording["id"] pp["id_tags"]["MUSICBRAINZ_RECORDING_ID"] = release_recording["id"] break break except (ValueError, TypeError): pass # Genre fallback chain: most MusicBrainz recordings don't carry genres at # the track level, but releases and artists usually do. If the recording # came back empty, try the release; if that's empty too, fetch the artist # with `includes=['genres']` and use that. _release_detail_for_genres = locals().get("release_detail") if not pp["mb_genres"] and _release_detail_for_genres: pp["mb_genres"] = [ g["name"] for g in sorted( _release_detail_for_genres.get("genres", []), key=lambda x: x.get("count", 0), reverse=True, ) ] if not pp["mb_genres"] and pp.get("artist_mbid"): artist_detail = _call_source_lookup( "MusicBrainz artist details", mb_service.mb_client.get_artist, pp["artist_mbid"], includes=["genres"], ) if artist_detail: pp["mb_genres"] = [ g["name"] for g in sorted( artist_detail.get("genres", []), key=lambda x: x.get("count", 0), reverse=True, ) ] def _process_deezer_source(pp: dict, metadata: dict, cfg, runtime, track_title: str, artist_name: str) -> None: if cfg.get("deezer.embed_tags", True) is False: return if not track_title or not artist_name: return deezer_worker = getattr(runtime, "deezer_worker", None) dz_client = deezer_worker.client if deezer_worker else None if not dz_client: return dz_result = _call_source_lookup("Deezer track", dz_client.search_track, artist_name, track_title) if dz_result and _names_match(dz_result.get("title", ""), track_title) and _names_match(dz_result.get("artist", {}).get("name", ""), artist_name): dz_track_id = dz_result["id"] pp["id_tags"]["DEEZER_TRACK_ID"] = str(dz_track_id) dz_artist_id = dz_result.get("artist", {}).get("id") if dz_artist_id: pp["id_tags"]["DEEZER_ARTIST_ID"] = str(dz_artist_id) dz_details = _call_source_lookup("Deezer track details", dz_client.get_track_details, dz_track_id) if dz_details: bpm_val = dz_details.get("bpm") if bpm_val and bpm_val > 0: pp["deezer_bpm"] = bpm_val dz_isrc = dz_details.get("isrc") if dz_isrc: pp["deezer_isrc"] = dz_isrc if not pp["release_year"]: dz_album = dz_result.get("album", {}) dz_release = (dz_album.get("release_date", "") if isinstance(dz_album, dict) else "") or "" if len(dz_release) >= 4 and dz_release[:4].isdigit(): pp["release_year"] = dz_release[:4] def _process_audiodb_source(pp: dict, metadata: dict, cfg, runtime, track_title: str, artist_name: str) -> None: if cfg.get("audiodb.embed_tags", True) is False: return if not track_title or not artist_name: return audiodb_worker = getattr(runtime, "audiodb_worker", None) adb_client = audiodb_worker.client if audiodb_worker else None if not adb_client: return adb_result = _call_source_lookup("AudioDB track", adb_client.search_track, artist_name, track_title) if adb_result and _names_match(adb_result.get("strTrack", ""), track_title) and _names_match(adb_result.get("strArtist", ""), artist_name): adb_track_id = adb_result.get("idTrack") if adb_track_id: pp["id_tags"]["AUDIODB_TRACK_ID"] = str(adb_track_id) adb_mb_track = adb_result.get("strMusicBrainzID") if adb_mb_track and "MUSICBRAINZ_RECORDING_ID" not in pp["id_tags"]: pp["id_tags"]["MUSICBRAINZ_RECORDING_ID"] = adb_mb_track pp["recording_mbid"] = adb_mb_track adb_mb_artist = adb_result.get("strMusicBrainzArtistID") if adb_mb_artist and "MUSICBRAINZ_ARTIST_ID" not in pp["id_tags"]: pp["id_tags"]["MUSICBRAINZ_ARTIST_ID"] = adb_mb_artist pp["artist_mbid"] = adb_mb_artist pp["audiodb_mood"] = adb_result.get("strMood") or None pp["audiodb_style"] = adb_result.get("strStyle") or None pp["audiodb_genre"] = adb_result.get("strGenre") or None def _process_tidal_source(pp: dict, metadata: dict, cfg, runtime, track_title: str, artist_name: str) -> None: if cfg.get("tidal.embed_tags", True) is False: return if not track_title or not artist_name: return tidal_client = getattr(runtime, "tidal_client", None) if not (tidal_client and tidal_client.is_authenticated()): return td_result = _call_source_lookup("Tidal track", tidal_client.search_track, artist_name, track_title) if td_result and _names_match(td_result.get("title", ""), track_title): td_track_id = td_result.get("id") if td_track_id: pp["id_tags"]["TIDAL_TRACK_ID"] = str(td_track_id) td_artist = td_result.get("artist", {}) if isinstance(td_artist, dict) and td_artist.get("id"): pp["id_tags"]["TIDAL_ARTIST_ID"] = str(td_artist["id"]) if td_track_id: td_details = _call_source_lookup("Tidal track details", tidal_client.get_track, str(td_track_id)) if td_details: pp["tidal_isrc"] = td_details.get("isrc") td_bpm = td_details.get("bpm") if td_bpm and td_bpm > 0: pp["tidal_bpm"] = td_bpm td_copyright = td_details.get("copyright") if isinstance(td_copyright, dict): td_copyright = td_copyright.get("text", td_copyright.get("name", "")) pp["tidal_copyright"] = td_copyright or None if not pp["release_year"]: td_album = td_result.get("album", {}) td_release = "" if isinstance(td_album, dict): td_release = str(td_album.get("release_date", "") or td_album.get("releaseDate", "") or "") if len(td_release) >= 4 and td_release[:4].isdigit(): pp["release_year"] = td_release[:4] def _process_hifi_source(pp: dict, metadata: dict, cfg, runtime, track_title: str, artist_name: str) -> None: if cfg.get("hifi.embed_tags", True) is False: return if not track_title or not artist_name: return hifi_client = getattr(runtime, "hifi_client", None) if not hifi_client: return hifi_results = _call_source_lookup("HiFi track", hifi_client.search_tracks, track_title, artist_name) if hifi_results and len(hifi_results) > 0: hifi_track = hifi_results[0] if _names_match(hifi_track.get("title", ""), track_title): hifi_track_id = hifi_track.get("id") if hifi_track_id: pp["id_tags"]["HIFI_TRACK_ID"] = str(hifi_track_id) hifi_artist_id = hifi_track.get("artist_id") if hifi_artist_id: pp["id_tags"]["HIFI_ARTIST_ID"] = str(hifi_artist_id) if hifi_track_id: hifi_details = _call_source_lookup("HiFi track details", hifi_client.get_track_info, hifi_track_id) if hifi_details: hifi_isrc = hifi_details.get("isrc") if hifi_isrc: pp["hifi_isrc"] = hifi_isrc hifi_bpm = hifi_details.get("bpm") if hifi_bpm and hifi_bpm > 0: pp["hifi_bpm"] = hifi_bpm hifi_copyright = hifi_details.get("copyright") if hifi_copyright: pp["hifi_copyright"] = hifi_copyright if not pp["release_year"]: hifi_album_id = hifi_track.get("album_id") if hifi_album_id: hifi_album = _call_source_lookup("HiFi album", hifi_client.get_album, hifi_album_id) if hifi_album: hifi_release = str(hifi_album.get("release_date", "") or "") if len(hifi_release) >= 4 and hifi_release[:4].isdigit(): pp["release_year"] = hifi_release[:4] def _process_qobuz_source(pp: dict, metadata: dict, cfg, runtime, track_title: str, artist_name: str) -> None: if cfg.get("qobuz.embed_tags", True) is False: return if not track_title or not artist_name: return qobuz_worker = getattr(runtime, "qobuz_enrichment_worker", None) qz_client = qobuz_worker.client if qobuz_worker else None if not (qz_client and qz_client.is_authenticated()): return qz_result = _call_source_lookup("Qobuz track", qz_client.search_track, artist_name, track_title) if qz_result: qz_performer = qz_result.get("performer") or {} if not isinstance(qz_performer, dict): qz_performer = {} qz_artist_name = qz_performer.get("name", "") if _names_match(qz_result.get("title", ""), track_title) and _names_match(qz_artist_name, artist_name): qz_track_id = qz_result.get("id") if qz_track_id: pp["id_tags"]["QOBUZ_TRACK_ID"] = str(qz_track_id) if qz_performer.get("id"): pp["id_tags"]["QOBUZ_ARTIST_ID"] = str(qz_performer["id"]) qz_isrc = qz_result.get("isrc") if isinstance(qz_isrc, dict): qz_isrc = qz_isrc.get("value", qz_isrc.get("id", "")) if qz_isrc: pp["qobuz_isrc"] = qz_isrc qz_copyright = qz_result.get("copyright") if isinstance(qz_copyright, dict): qz_copyright = qz_copyright.get("text", qz_copyright.get("name", "")) if isinstance(qz_copyright, str): pp["qobuz_copyright"] = qz_copyright qz_album = qz_result.get("album", {}) if isinstance(qz_album, dict): qz_label_info = qz_album.get("label", {}) if isinstance(qz_label_info, dict) and qz_label_info.get("name"): pp["qobuz_label"] = qz_label_info["name"] if not pp["release_year"]: qz_release = str(qz_album.get("release_date_original", "") or "") if not qz_release: qz_ts = qz_album.get("released_at") if qz_ts and isinstance(qz_ts, (int, float)) and qz_ts > 0: import datetime as _dt qz_release = str(_dt.datetime.utcfromtimestamp(qz_ts).year) if len(qz_release) >= 4 and qz_release[:4].isdigit(): pp["release_year"] = qz_release[:4] def _process_lastfm_source(pp: dict, metadata: dict, cfg, runtime, track_title: str, artist_name: str) -> None: if cfg.get("lastfm.embed_tags", True) is False: return if not track_title or not artist_name: return lastfm_worker = getattr(runtime, "lastfm_worker", None) lf_client = lastfm_worker.client if lastfm_worker else None if not lf_client: return lf_result = _call_source_lookup("Last.fm track", lf_client.get_track_info, artist_name, track_title) if lf_result: lf_url = lf_result.get("url") if lf_url: pp["lastfm_url"] = lf_url lf_toptags = lf_result.get("toptags", {}) if isinstance(lf_toptags, dict): tag_list = lf_toptags.get("tag", []) if isinstance(tag_list, list): pp["lastfm_tags"] = [tag.get("name", "") for tag in tag_list if isinstance(tag, dict) and tag.get("name")] elif isinstance(tag_list, dict) and tag_list.get("name"): pp["lastfm_tags"] = [tag_list["name"]] def _process_genius_source(pp: dict, metadata: dict, cfg, runtime, track_title: str, artist_name: str) -> None: if cfg.get("genius.embed_tags", True) is False: return if not track_title or not artist_name: return import core.genius_client as _genius_module if time.time() < _genius_module._rate_limit_until: logger.info("Genius rate-limited, skipping (non-blocking)") return genius_worker = getattr(runtime, "genius_worker", None) g_client = genius_worker.client if genius_worker else None if not g_client: return g_result = _call_source_lookup("Genius track", g_client.search_song, artist_name, track_title) if g_result: g_id = g_result.get("id") if g_id: pp["id_tags"]["GENIUS_TRACK_ID"] = str(g_id) g_url = g_result.get("url") if g_url: pp["genius_url"] = g_url def _process_source_enrichment(source_name: str, pp: dict, metadata: dict, cfg, runtime, track_title: str, artist_name: str) -> None: if source_name == "musicbrainz": _process_musicbrainz_source(pp, metadata, cfg, runtime, track_title, artist_name) elif source_name == "deezer": _process_deezer_source(pp, metadata, cfg, runtime, track_title, artist_name) elif source_name == "audiodb": _process_audiodb_source(pp, metadata, cfg, runtime, track_title, artist_name) elif source_name == "tidal": _process_tidal_source(pp, metadata, cfg, runtime, track_title, artist_name) elif source_name == "hifi": _process_hifi_source(pp, metadata, cfg, runtime, track_title, artist_name) elif source_name == "qobuz": _process_qobuz_source(pp, metadata, cfg, runtime, track_title, artist_name) elif source_name == "lastfm": _process_lastfm_source(pp, metadata, cfg, runtime, track_title, artist_name) elif source_name == "genius": _process_genius_source(pp, metadata, cfg, runtime, track_title, artist_name) def _write_embedded_metadata(audio_file, metadata: dict, pp: dict, cfg, symbols): filtered_tags: Dict[str, str] = {} for tag_name, value in pp["id_tags"].items(): config_path = SOURCE_TAG_CONFIG.get(tag_name) if config_path and not _tag_enabled(cfg, config_path): continue filtered_tags[tag_name] = value written = [] release_year = pp["release_year"] if isinstance(audio_file.tags, symbols.ID3): for tag_name, value in filtered_tags.items(): spec = ID3_TAG_MAP.get(tag_name) if spec: frame_type, desc = spec if frame_type == "UFID": audio_file.tags.add(symbols.UFID(owner=desc, data=str(value).encode("ascii"))) written.append(f"UFID:{desc}") elif frame_type == "TDOR": audio_file.tags.add(symbols.TDOR(encoding=3, text=[value])) written.append("TDOR") elif frame_type == "TMED": audio_file.tags.add(symbols.TMED(encoding=3, text=[value])) written.append("TMED") else: audio_file.tags.add(symbols.TXXX(encoding=3, desc=desc, text=[value])) written.append(f"TXXX:{desc}") else: audio_file.tags.add(symbols.TXXX(encoding=3, desc=tag_name, text=[str(value)])) written.append(f"TXXX:{tag_name}") elif isinstance(audio_file, symbols.MP4): for tag_name, value in filtered_tags.items(): key = f"----:com.apple.iTunes:{MP4_TAG_MAP.get(tag_name, tag_name)}" audio_file[key] = [symbols.MP4FreeForm(str(value).encode("utf-8"))] written.append(key) elif is_vorbis_like(audio_file, symbols): for tag_name, value in filtered_tags.items(): audio_file[VORBIS_TAG_MAP.get(tag_name, tag_name)] = [str(value)] written.append(VORBIS_TAG_MAP.get(tag_name, tag_name)) if written: logger.info("Embedded IDs: %s", ", ".join(written)) if release_year and not metadata.get("date"): metadata["date"] = release_year if isinstance(audio_file.tags, symbols.ID3): audio_file.tags.add(symbols.TDRC(encoding=3, text=[release_year])) elif is_vorbis_like(audio_file, symbols): audio_file["date"] = [release_year] elif isinstance(audio_file, symbols.MP4): audio_file["\xa9day"] = [release_year] logger.info("Date tag: %s", release_year) bpm_candidates = [] if pp["deezer_bpm"] and pp["deezer_bpm"] > 0 and _tag_enabled(cfg, "deezer.tags.bpm"): bpm_candidates.append(("Deezer", pp["deezer_bpm"])) if pp["tidal_bpm"] and pp["tidal_bpm"] > 0 and _tag_enabled(cfg, "tidal.tags.bpm"): bpm_candidates.append(("Tidal", pp["tidal_bpm"])) if pp["hifi_bpm"] and pp["hifi_bpm"] > 0 and _tag_enabled(cfg, "hifi.tags.bpm"): bpm_candidates.append(("HiFi", pp["hifi_bpm"])) if bpm_candidates: bpm_source, bpm_val = bpm_candidates[0] bpm_int = int(bpm_val) if isinstance(audio_file.tags, symbols.ID3): audio_file.tags.add(symbols.TBPM(encoding=3, text=[str(bpm_int)])) elif is_vorbis_like(audio_file, symbols): audio_file["BPM"] = [str(bpm_int)] elif isinstance(audio_file, symbols.MP4): audio_file["tmpo"] = [bpm_int] logger.info("BPM (%s): %s", bpm_source, bpm_int) if _tag_enabled(cfg, "audiodb.tags.mood") and pp["audiodb_mood"]: if isinstance(audio_file.tags, symbols.ID3): audio_file.tags.add(symbols.TXXX(encoding=3, desc="MOOD", text=[pp["audiodb_mood"]])) elif is_vorbis_like(audio_file, symbols): audio_file["MOOD"] = [pp["audiodb_mood"]] elif isinstance(audio_file, symbols.MP4): audio_file["----:com.apple.iTunes:MOOD"] = [symbols.MP4FreeForm(pp["audiodb_mood"].encode("utf-8"))] if _tag_enabled(cfg, "audiodb.tags.style") and pp["audiodb_style"]: if isinstance(audio_file.tags, symbols.ID3): audio_file.tags.add(symbols.TXXX(encoding=3, desc="STYLE", text=[pp["audiodb_style"]])) elif is_vorbis_like(audio_file, symbols): audio_file["STYLE"] = [pp["audiodb_style"]] elif isinstance(audio_file, symbols.MP4): audio_file["----:com.apple.iTunes:STYLE"] = [symbols.MP4FreeForm(pp["audiodb_style"].encode("utf-8"))] if _tag_enabled(cfg, "metadata_enhancement.tags.genre_merge"): enrichment_genres = [] if _tag_enabled(cfg, "musicbrainz.tags.genres"): enrichment_genres += pp["mb_genres"] if pp["audiodb_genre"] and _tag_enabled(cfg, "audiodb.tags.genre"): enrichment_genres.append(pp["audiodb_genre"]) if _tag_enabled(cfg, "lastfm.tags.genres"): enrichment_genres += pp["lastfm_tags"] if enrichment_genres: from core.genre_filter import filter_genres as _filter_genres enrichment_genres = _filter_genres(enrichment_genres, cfg) source_genres = [g.strip() for g in str(metadata.get("genre", "")).split(",") if g.strip()] seen = set() merged = [] for genre in source_genres + enrichment_genres: key = genre.strip().lower() if key and key not in seen: seen.add(key) merged.append(genre.strip().title()) if len(merged) >= 5: break if merged: genre_string = ", ".join(merged) if isinstance(audio_file.tags, symbols.ID3): audio_file.tags.add(symbols.TCON(encoding=3, text=[genre_string])) elif is_vorbis_like(audio_file, symbols): audio_file["GENRE"] = [genre_string] elif isinstance(audio_file, symbols.MP4): audio_file["\xa9gen"] = [genre_string] logger.info("Genres merged: %s", genre_string) isrc_candidates = [] if pp["isrc"] and _tag_enabled(cfg, "musicbrainz.tags.isrc"): isrc_candidates.append(("MusicBrainz", pp["isrc"])) if pp["deezer_isrc"] and _tag_enabled(cfg, "deezer.tags.isrc"): isrc_candidates.append(("Deezer", pp["deezer_isrc"])) if pp["tidal_isrc"] and _tag_enabled(cfg, "tidal.tags.isrc"): isrc_candidates.append(("Tidal", pp["tidal_isrc"])) if pp["hifi_isrc"] and _tag_enabled(cfg, "hifi.tags.isrc"): isrc_candidates.append(("HiFi", pp["hifi_isrc"])) if pp["qobuz_isrc"] and _tag_enabled(cfg, "qobuz.tags.isrc"): isrc_candidates.append(("Qobuz", pp["qobuz_isrc"])) if isrc_candidates: isrc_source, final_isrc = isrc_candidates[0] if isinstance(audio_file.tags, symbols.ID3): audio_file.tags.add(symbols.TSRC(encoding=3, text=[final_isrc])) elif is_vorbis_like(audio_file, symbols): audio_file["ISRC"] = [final_isrc] elif isinstance(audio_file, symbols.MP4): audio_file["----:com.apple.iTunes:ISRC"] = [symbols.MP4FreeForm(final_isrc.encode("utf-8"))] logger.info("ISRC (%s): %s", isrc_source, final_isrc) copyright_candidates = [] if pp["tidal_copyright"] and _tag_enabled(cfg, "tidal.tags.copyright"): copyright_candidates.append(("Tidal", pp["tidal_copyright"])) if pp["qobuz_copyright"] and _tag_enabled(cfg, "qobuz.tags.copyright"): copyright_candidates.append(("Qobuz", pp["qobuz_copyright"])) if pp["hifi_copyright"] and _tag_enabled(cfg, "hifi.tags.copyright"): copyright_candidates.append(("HiFi", pp["hifi_copyright"])) if copyright_candidates: copyright_source, final_copyright = copyright_candidates[0] if isinstance(audio_file.tags, symbols.ID3): audio_file.tags.add(symbols.TCOP(encoding=3, text=[final_copyright])) elif is_vorbis_like(audio_file, symbols): audio_file["COPYRIGHT"] = [final_copyright] elif isinstance(audio_file, symbols.MP4): audio_file["cprt"] = [final_copyright] logger.info("Copyright (%s): %s", copyright_source, final_copyright[:60]) if _tag_enabled(cfg, "qobuz.tags.label") and pp["qobuz_label"]: if isinstance(audio_file.tags, symbols.ID3): audio_file.tags.add(symbols.TPUB(encoding=3, text=[pp["qobuz_label"]])) elif is_vorbis_like(audio_file, symbols): audio_file["LABEL"] = [pp["qobuz_label"]] elif isinstance(audio_file, symbols.MP4): audio_file["----:com.apple.iTunes:LABEL"] = [symbols.MP4FreeForm(pp["qobuz_label"].encode("utf-8"))] if _tag_enabled(cfg, "lastfm.tags.url") and pp["lastfm_url"]: if isinstance(audio_file.tags, symbols.ID3): audio_file.tags.add(symbols.TXXX(encoding=3, desc="LASTFM_URL", text=[pp["lastfm_url"]])) elif is_vorbis_like(audio_file, symbols): audio_file["LASTFM_URL"] = [pp["lastfm_url"]] elif isinstance(audio_file, symbols.MP4): audio_file["----:com.apple.iTunes:LASTFM_URL"] = [symbols.MP4FreeForm(pp["lastfm_url"].encode("utf-8"))] if _tag_enabled(cfg, "genius.tags.url") and pp["genius_url"]: if isinstance(audio_file.tags, symbols.ID3): audio_file.tags.add(symbols.TXXX(encoding=3, desc="GENIUS_URL", text=[pp["genius_url"]])) elif is_vorbis_like(audio_file, symbols): audio_file["GENIUS_URL"] = [pp["genius_url"]] elif isinstance(audio_file, symbols.MP4): audio_file["----:com.apple.iTunes:GENIUS_URL"] = [symbols.MP4FreeForm(pp["genius_url"].encode("utf-8"))] return release_year def _update_album_year_in_database(db, metadata: dict, release_year) -> None: if db is None: return try: album_name_for_db = metadata.get("album", "") album_artist_for_db = metadata.get("album_artist", "") or metadata.get("artist", "") if album_name_for_db and album_artist_for_db: conn = db._get_connection() try: cursor = conn.cursor() cursor.execute( """ UPDATE albums SET year = ? WHERE (year IS NULL OR year = 0) AND id IN ( SELECT al.id FROM albums al JOIN artists ar ON ar.id = al.artist_id WHERE LOWER(al.title) = LOWER(?) AND LOWER(ar.name) = LOWER(?) ) """, (int(release_year), album_name_for_db, album_artist_for_db), ) if cursor.rowcount > 0: conn.commit() logger.info("Updated album year to %s in database", release_year) else: conn.rollback() finally: conn.close() except Exception as exc: logger.error("Could not update album year in DB: %s", exc) def extract_source_metadata(context: dict, artist: dict, album_info: dict) -> dict: if album_info is None: album_info = {} cfg = get_config_manager() context = normalize_import_context(context) original_search = get_import_original_search(context) album_ctx = get_import_context_album(context) track_info = get_import_track_info(context) source = get_import_source(context) source_ids = get_import_source_ids(context) artist_dict = artist if isinstance(artist, dict) else { "name": extract_artist_name(artist), "id": getattr(artist, "id", ""), "genres": list(getattr(artist, "genres", []) or []), } metadata: Dict[str, Any] = { "source": source, "source_track_id": source_ids["track_id"], "source_artist_id": source_ids["artist_id"], "source_album_id": source_ids["album_id"], } metadata["title"] = get_import_clean_title(context, album_info=album_info, default=original_search.get("title", "")) if original_search.get("clean_title"): logger.info("Metadata: Using clean title: '%s'", metadata["title"]) elif album_info.get("clean_track_name"): logger.info("Metadata: Using album info clean name: '%s'", metadata["title"]) else: logger.warning("Metadata: Using original title as fallback: '%s'", metadata["title"]) # Resolve canonical artists list. Soulseek matched-download contexts # only carry `original_search.artist` (singular string) — the full # contributors list lives on `track_info` (the matched Spotify/etc # track object). Deezer-direct contexts populate `original_search.artists` # directly. Pure helper handles all three shapes. all_artists = resolve_track_artists(original_search, track_info, artist_dict) if all_artists: # Deezer upgrade path: Deezer's `/search` endpoint only returns # the primary artist for each track. The full contributors # array (feat., remix collaborators, producers credited as # artists) lives on `/track/` and gets parsed by # `_build_enhanced_track`. Without this upgrade Deezer-sourced # tracks never get multi-artist tags even with the right # settings on. One extra API call per Deezer-sourced track, # only when the search response had a single artist (so it's # a no-op when search already returned multiple). if (source == "deezer" and len(all_artists) == 1 and source_ids.get("track_id")): try: from core.metadata import get_deezer_client deezer = get_deezer_client() if deezer: full = deezer.get_track_details(str(source_ids["track_id"])) if full and isinstance(full.get("artists"), list) and len(full["artists"]) > 1: upgraded = [a for a in full["artists"] if a] if upgraded: logger.info( "Metadata: Deezer contributors upgrade — search returned " "%d artist, /track/ returned %d (%s)", len(all_artists), len(upgraded), upgraded, ) all_artists = upgraded except Exception as e: logger.debug("Deezer contributors upgrade failed: %s", e) # Store the multi-artist list so the enrichment writer can emit # proper multi-value ARTIST tags (TPE1 multi-value for ID3, # "artists" key for Vorbis) when `write_multi_artist` is on. # Without this assignment the field was always empty and the # multi-artist write path silently no-op'd. metadata["_artists_list"] = list(all_artists) # `feat_in_title` (when true): pull featured artists out of the # ARTIST tag entirely and append "(feat. X, Y)" to the title. # Matches Picard / Beets convention and lets media servers # group by primary artist instead of treating "A, B & C" as a # distinct artist string. # `artist_separator`: when feat_in_title is off (or there's # only one artist) and write_multi_artist is on, this is the # delimiter used to join all artists into the single ARTIST # string. Picard defaults to "; " — we default to ", " to # preserve historical behavior for users who haven't touched # the setting. feat_in_title = cfg.get("metadata_enhancement.tags.feat_in_title", False) artist_separator = cfg.get("metadata_enhancement.tags.artist_separator", ", ") if feat_in_title and len(all_artists) > 1: metadata["artist"] = all_artists[0] featured = all_artists[1:] existing_title = metadata.get("title", "") or "" # Don't double-append if the title already carries the # featured artists. Source titles vary: "(feat. X)", # "(featuring X)", "(ft. X)", "ft. X" (no parens), "[feat X]" # (no period, brackets), etc. Word-boundary regex catches # `feat`, `feat.`, `featuring`, `ft`, `ft.` regardless of # surrounding punctuation. Case-insensitive. import re as _feat_re already_has_feat = bool(_feat_re.search( r'\b(?:feat|feat\.|featuring|ft|ft\.)\b', existing_title, _feat_re.IGNORECASE, )) if existing_title and not already_has_feat: metadata["title"] = f"{existing_title} (feat. {', '.join(featured)})" logger.info( "Metadata: feat_in_title — primary='%s', featured=%s, title='%s'", metadata["artist"], featured, metadata["title"], ) else: metadata["artist"] = artist_separator.join(all_artists) logger.info( "Metadata: Using all artists joined with %r: '%s'", artist_separator, metadata["artist"], ) else: metadata["artist"] = artist_dict.get("name", "") or get_import_clean_artist(context) logger.info("Metadata: Using primary artist: '%s'", metadata["artist"]) raw_album_artist = artist_dict.get("name", "") or metadata["artist"] track_info_ctx = track_info or {} explicit_artist = track_info_ctx.get("_explicit_artist_context") if isinstance(track_info_ctx, dict) else None album_artists_for_collab = None if isinstance(explicit_artist, dict) and explicit_artist.get("name"): raw_album_artist = explicit_artist["name"] album_artists_for_collab = [explicit_artist] elif isinstance(explicit_artist, str) and explicit_artist: raw_album_artist = explicit_artist album_artists_for_collab = [{"name": explicit_artist}] elif album_ctx and isinstance(album_ctx, dict): album_artists = album_ctx.get("artists", []) if album_artists: first_album_artist = album_artists[0] if isinstance(first_album_artist, dict) and first_album_artist.get("name"): raw_album_artist = first_album_artist["name"] elif isinstance(first_album_artist, str) and first_album_artist: raw_album_artist = first_album_artist album_artists_for_collab = album_artists collab_mode = cfg.get("file_organization.collab_artist_mode", "first") if collab_mode == "first" and raw_album_artist: context_artists = album_artists_for_collab or original_search.get("artists") or track_info_ctx.get("artists") or [] if len(context_artists) > 1: first = context_artists[0] raw_album_artist = first.get("name", first) if isinstance(first, dict) else str(first) elif len(context_artists) == 1 and ("," in raw_album_artist or " & " in raw_album_artist): artist_id = str(artist_dict.get("id", "")) if source == "itunes" and artist_id.isdigit(): try: itunes_client = get_itunes_client() if itunes_client and hasattr(itunes_client, "resolve_primary_artist"): resolved = itunes_client.resolve_primary_artist(artist_id) if resolved and resolved != raw_album_artist: raw_album_artist = resolved except Exception as e: logger.debug("itunes primary artist resolve failed: %s", e) metadata["album_artist"] = raw_album_artist if album_info.get("is_album"): metadata["album"] = album_info.get("album_name", "Unknown Album") metadata["track_number"] = album_info.get("track_number", 1) metadata["total_tracks"] = album_ctx.get("total_tracks", 1) if album_ctx else 1 logger.info("[METADATA] Album track - track_number: %s, album: %s", metadata["track_number"], metadata["album"]) else: if album_ctx and album_ctx.get("name"): logger.info("[SAFEGUARD] Using album context name instead of track title for album metadata") metadata["album"] = album_ctx["name"] metadata["track_number"] = album_info.get("track_number", 1) if album_info else 1 metadata["total_tracks"] = album_ctx.get("total_tracks", 1) else: metadata["album"] = metadata["title"] metadata["track_number"] = 1 metadata["total_tracks"] = 1 disc_num = original_search.get("disc_number") if disc_num is None and album_info: disc_num = album_info.get("disc_number") metadata["disc_number"] = disc_num if disc_num is not None else 1 if album_ctx and album_ctx.get("release_date"): release_date = _normalize_release_date_tag(album_ctx.get("release_date")) if release_date: metadata["date"] = release_date genres = artist_dict.get("genres") or [] if genres: from core.genre_filter import filter_genres filtered = filter_genres(list(genres[:2]), cfg) if filtered: metadata["genre"] = ", ".join(filtered) metadata["album_art_url"] = album_info.get("album_image_url") if album_info else None if not metadata["album_art_url"] and album_ctx: album_image = album_ctx.get("image_url") if not album_image and album_ctx.get("images"): first_image = album_ctx["images"][0] album_image = first_image.get("url") if isinstance(first_image, dict) else None metadata["album_art_url"] = album_image logger.info( "[Metadata Summary] title='%s' | artist='%s' | album_artist='%s' | album='%s' | date=%s | track=%s/%s | disc=%s", metadata.get("title"), metadata.get("artist"), metadata.get("album_artist"), metadata.get("album"), metadata.get("date", ""), metadata.get("track_number"), metadata.get("total_tracks"), metadata.get("disc_number"), ) return metadata def embed_source_ids(audio_file, metadata: dict, context: dict = None, runtime=None): cfg = get_config_manager() symbols = get_mutagen_symbols() if not symbols: return try: context = normalize_import_context(context) source_ids = _collect_source_ids(metadata, cfg) track_title = metadata.get("title", "") artist_name = metadata.get("album_artist", "") or metadata.get("artist", "") track_info = get_import_track_info(context) explicit_artist = (track_info or {}).get("_explicit_artist_context") if isinstance(track_info, dict) else None batch_artist_name = None if isinstance(explicit_artist, dict) and explicit_artist.get("name"): batch_artist_name = explicit_artist["name"] elif isinstance(explicit_artist, str) and explicit_artist: batch_artist_name = explicit_artist pp = { "id_tags": source_ids, "track_title": track_title, "artist_name": artist_name, "batch_artist_name": batch_artist_name, "metadata": metadata, "recording_mbid": None, "artist_mbid": None, "release_mbid": "", "mb_genres": [], "isrc": None, "deezer_bpm": None, "deezer_isrc": None, "tidal_bpm": None, "hifi_bpm": None, "hifi_copyright": None, "audiodb_mood": None, "audiodb_style": None, "audiodb_genre": None, "tidal_isrc": None, "tidal_copyright": None, "hifi_isrc": None, "qobuz_isrc": None, "qobuz_copyright": None, "qobuz_label": None, "lastfm_tags": [], "lastfm_url": None, "genius_url": None, "release_year": None, } source_order = cfg.get("metadata_enhancement.post_process_order", None) if not isinstance(source_order, list) or not source_order: source_order = DEFAULT_SOURCE_ORDER # If this download came from HiFi, use cached metadata from the download # pipeline instead of re-searching the HiFi API. original_search = get_import_original_search(context) cached_meta = original_search.get("_source_metadata") or {} if cached_meta.get("source") == "hifi": if _tag_enabled(cfg, "hifi.embed_tags"): if cfg.get("hifi.tags.track_id", True) and cached_meta.get("track_id"): pp["id_tags"]["HIFI_TRACK_ID"] = str(cached_meta["track_id"]) if cfg.get("hifi.tags.artist_id", True) and cached_meta.get("artist_id"): pp["id_tags"]["HIFI_ARTIST_ID"] = str(cached_meta["artist_id"]) if cfg.get("hifi.tags.isrc", True) and cached_meta.get("isrc"): pp["hifi_isrc"] = cached_meta["isrc"] if cfg.get("hifi.tags.bpm", True) and cached_meta.get("bpm"): pp["hifi_bpm"] = cached_meta["bpm"] if cfg.get("hifi.tags.copyright", True) and cached_meta.get("copyright"): pp["hifi_copyright"] = cached_meta["copyright"] source_order = [s for s in source_order if s != "hifi"] # If this download came from Tidal, use cached metadata from the download # pipeline instead of re-searching the Tidal API. if cached_meta.get("source") == "tidal": if _tag_enabled(cfg, "tidal.embed_tags"): if cfg.get("tidal.tags.track_id", True) and cached_meta.get("track_id"): pp["id_tags"]["TIDAL_TRACK_ID"] = str(cached_meta["track_id"]) if cfg.get("tidal.tags.artist_id", True) and cached_meta.get("artist_id"): pp["id_tags"]["TIDAL_ARTIST_ID"] = str(cached_meta["artist_id"]) if cfg.get("tidal.tags.isrc", True) and cached_meta.get("isrc"): pp["tidal_isrc"] = cached_meta["isrc"] if cfg.get("tidal.tags.bpm", True) and cached_meta.get("bpm"): pp["tidal_bpm"] = cached_meta["bpm"] if cfg.get("tidal.tags.copyright", True) and cached_meta.get("copyright"): pp["tidal_copyright"] = cached_meta["copyright"] source_order = [s for s in source_order if s != "tidal"] db = get_database() for source_name in source_order: _process_source_enrichment(source_name, pp, metadata, cfg, runtime, track_title, artist_name) if not pp["id_tags"] and not pp["deezer_bpm"] and not pp["deezer_isrc"] and not pp["tidal_bpm"] and not pp["hifi_bpm"] and not pp["hifi_copyright"] and not pp["audiodb_mood"] and not pp["audiodb_style"]: return release_year = _write_embedded_metadata(audio_file, metadata, pp, cfg, symbols) release_id = pp["release_mbid"] if release_id: metadata["musicbrainz_release_id"] = release_id _update_album_year_in_database(db, metadata, release_year) # Expose the final ID tag set + per-source values back to the # import context so downstream side-effects (notably # ``record_download_provenance``) can persist them to the # ``track_downloads`` table without re-collecting. Without this, # the watchlist scanner would have to wait for the async # enrichment workers to backfill ``tracks.spotify_track_id`` etc. # before recognizing freshly downloaded files. if isinstance(context, dict): try: context["_embedded_id_tags"] = dict(pp.get("id_tags") or {}) isrc_value = ( pp.get("isrc") or pp.get("deezer_isrc") or pp.get("tidal_isrc") or pp.get("hifi_isrc") or pp.get("qobuz_isrc") ) if isrc_value: context["_isrc"] = str(isrc_value) except Exception as e: logger.debug("context isrc copy failed: %s", e) except Exception as exc: logger.error("Error embedding source IDs (non-fatal): %s", exc)