You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
SoulSync/core/metadata/source.py

1241 lines
58 KiB

"""Source metadata extraction and source-ID embedding helpers."""
from __future__ import annotations
import re
import socket
import threading
import time
from collections import OrderedDict
from typing import Any, Dict
import requests
from core.imports.context import (
extract_artist_name,
get_import_clean_artist,
get_import_clean_title,
get_import_context_album,
get_import_original_search,
get_import_source,
get_import_source_ids,
get_import_track_info,
get_source_tag_names,
normalize_import_context,
)
from core.metadata.artist_resolution import resolve_track_artists
from core.metadata.registry import get_itunes_client
from database.music_database import get_database
from core.metadata.common import (
get_config_manager,
get_mutagen_symbols,
is_vorbis_like,
)
from utils.logging_config import get_logger as _create_logger
__all__ = [
"extract_source_metadata",
"embed_source_ids",
"normalize_album_cache_key",
"mb_release_cache",
"mb_release_cache_lock",
"mb_release_detail_cache",
"mb_release_detail_cache_lock",
]
_MB_RELEASE_CACHE_MAX_ENTRIES = 4096
_MB_RELEASE_DETAIL_CACHE_MAX_ENTRIES = 4096
mb_release_cache: "OrderedDict[tuple, str]" = OrderedDict()
mb_release_cache_lock = threading.RLock()
mb_release_detail_cache: "OrderedDict[str, Dict[str, Any]]" = OrderedDict()
mb_release_detail_cache_lock = threading.RLock()
logger = _create_logger("metadata.source")
_SOURCE_NETWORK_EXCEPTIONS = (requests.RequestException, socket.timeout, TimeoutError)
_EDITION_PAREN_RE = re.compile(
r'\s*[\(\[]\s*(?:deluxe|expanded|remaster(?:ed)?|anniversary|special|collector|'
r'limited|bonus|platinum|gold|super\s*deluxe|standard)'
r'(?:\s+(?:edition|version))?[^)\]]*[\)\]]',
re.IGNORECASE,
)
_EDITION_BARE_RE = re.compile(
r'\s+(?:-\s+)?(?:deluxe|expanded|remaster(?:ed)?|anniversary|special|collector|'
r'limited|bonus|platinum|gold|super\s*deluxe|standard)'
r'(?:\s+(?:edition|version))?\s*$',
re.IGNORECASE,
)
def normalize_album_cache_key(album_name: str) -> str:
result = _EDITION_PAREN_RE.sub("", album_name or "")
result = _EDITION_BARE_RE.sub("", result)
return result.lower().strip()
def _bounded_cache_get(cache, key):
value = cache.get(key)
if value is not None and hasattr(cache, "move_to_end"):
cache.move_to_end(key)
return value
def _bounded_cache_set(cache, key, value, max_entries: int) -> None:
cache[key] = value
if hasattr(cache, "move_to_end"):
cache.move_to_end(key)
while len(cache) > max_entries:
cache.popitem(last=False)
def _call_source_lookup(label: str, func, *args, **kwargs):
try:
return func(*args, **kwargs)
except _SOURCE_NETWORK_EXCEPTIONS as exc:
logger.warning("%s lookup failed (network): %s", label, exc)
return None
SOURCE_TAG_CONFIG = {
"SPOTIFY_TRACK_ID": "spotify.tags.track_id",
"SPOTIFY_ARTIST_ID": "spotify.tags.artist_id",
"SPOTIFY_ALBUM_ID": "spotify.tags.album_id",
"ITUNES_TRACK_ID": "itunes.tags.track_id",
"ITUNES_ARTIST_ID": "itunes.tags.artist_id",
"ITUNES_ALBUM_ID": "itunes.tags.album_id",
"MUSICBRAINZ_RECORDING_ID": "musicbrainz.tags.recording_id",
"MUSICBRAINZ_ARTIST_ID": "musicbrainz.tags.artist_id",
"MUSICBRAINZ_RELEASE_ID": "musicbrainz.tags.release_id",
"MUSICBRAINZ_RELEASEGROUPID": "musicbrainz.tags.release_group_id",
"MUSICBRAINZ_ALBUMARTISTID": "musicbrainz.tags.album_artist_id",
"MUSICBRAINZ_RELEASETRACKID": "musicbrainz.tags.release_track_id",
"RELEASETYPE": "musicbrainz.tags.release_type",
"ORIGINALDATE": "musicbrainz.tags.original_date",
"RELEASESTATUS": "musicbrainz.tags.release_status",
"RELEASECOUNTRY": "musicbrainz.tags.release_country",
"BARCODE": "musicbrainz.tags.barcode",
"MEDIA": "musicbrainz.tags.media",
"TOTALDISCS": "musicbrainz.tags.total_discs",
"CATALOGNUMBER": "musicbrainz.tags.catalog_number",
"SCRIPT": "musicbrainz.tags.script",
"ASIN": "musicbrainz.tags.asin",
"DEEZER_TRACK_ID": "deezer.tags.track_id",
"DEEZER_ARTIST_ID": "deezer.tags.artist_id",
"AUDIODB_TRACK_ID": "audiodb.tags.track_id",
"TIDAL_TRACK_ID": "tidal.tags.track_id",
"TIDAL_ARTIST_ID": "tidal.tags.artist_id",
"HIFI_TRACK_ID": "hifi.tags.track_id",
"HIFI_ARTIST_ID": "hifi.tags.artist_id",
"QOBUZ_TRACK_ID": "qobuz.tags.track_id",
"QOBUZ_ARTIST_ID": "qobuz.tags.artist_id",
"GENIUS_TRACK_ID": "genius.tags.track_id",
}
DEFAULT_SOURCE_ORDER = ["musicbrainz", "deezer", "audiodb", "tidal", "hifi", "qobuz", "lastfm", "genius"]
ID3_TAG_MAP = {
"MUSICBRAINZ_RECORDING_ID": ("UFID", "http://musicbrainz.org"),
"MUSICBRAINZ_ARTIST_ID": ("TXXX", "MusicBrainz Artist Id"),
"MUSICBRAINZ_RELEASE_ID": ("TXXX", "MusicBrainz Album Id"),
"MUSICBRAINZ_RELEASEGROUPID": ("TXXX", "MusicBrainz Release Group Id"),
"MUSICBRAINZ_ALBUMARTISTID": ("TXXX", "MusicBrainz Album Artist Id"),
"MUSICBRAINZ_RELEASETRACKID": ("TXXX", "MusicBrainz Release Track Id"),
"RELEASETYPE": ("TXXX", "MusicBrainz Album Type"),
"RELEASESTATUS": ("TXXX", "MusicBrainz Album Status"),
"RELEASECOUNTRY": ("TXXX", "MusicBrainz Album Release Country"),
"ORIGINALDATE": ("TDOR", None),
"MEDIA": ("TMED", None),
}
VORBIS_TAG_MAP = {
"MUSICBRAINZ_RECORDING_ID": "MUSICBRAINZ_TRACKID",
"MUSICBRAINZ_ARTIST_ID": "MUSICBRAINZ_ARTISTID",
"MUSICBRAINZ_RELEASE_ID": "MUSICBRAINZ_ALBUMID",
"MUSICBRAINZ_RELEASEGROUPID": "MUSICBRAINZ_RELEASEGROUPID",
"MUSICBRAINZ_ALBUMARTISTID": "MUSICBRAINZ_ALBUMARTISTID",
"MUSICBRAINZ_RELEASETRACKID": "MUSICBRAINZ_RELEASETRACKID",
}
MP4_TAG_MAP = {
"MUSICBRAINZ_RECORDING_ID": "MusicBrainz Track Id",
"MUSICBRAINZ_ARTIST_ID": "MusicBrainz Artist Id",
"MUSICBRAINZ_RELEASE_ID": "MusicBrainz Album Id",
"MUSICBRAINZ_RELEASEGROUPID": "MusicBrainz Release Group Id",
"MUSICBRAINZ_ALBUMARTISTID": "MusicBrainz Album Artist Id",
"MUSICBRAINZ_RELEASETRACKID": "MusicBrainz Release Track Id",
"RELEASETYPE": "MusicBrainz Album Type",
"RELEASESTATUS": "MusicBrainz Album Status",
"RELEASECOUNTRY": "MusicBrainz Album Release Country",
}
def _tag_enabled(cfg, path: str) -> bool:
return cfg.get(path, True) is not False
def _names_match(a: str, b: str, threshold: float = 0.75) -> bool:
if not a or not b:
return False
from difflib import SequenceMatcher
norm = lambda s: re.sub(r"[^a-z0-9 ]", "", re.sub(r"\(.*?\)", "", s).lower()).strip()
return SequenceMatcher(None, norm(a), norm(b)).ratio() >= threshold
def _normalize_release_date_tag(value: Any) -> str:
"""Return a tag-safe release date without inventing missing precision."""
raw = str(value or "").strip()
if not raw:
return ""
# Source APIs commonly return ISO timestamps. Audio DATE/TDRC tags should
# receive only the date precision the source actually provided.
raw = raw.split("T", 1)[0].strip()
match = re.match(r"^(\d{4})(?:-(\d{2})(?:-(\d{2}))?)?$", raw)
if not match:
return ""
year, month, day = match.groups()
if day:
return f"{year}-{month}-{day}"
if month:
return f"{year}-{month}"
return year
def _collect_source_ids(metadata: dict, cfg) -> dict:
source_ids = {}
source = (metadata.get("source") or "").strip().lower()
if source:
source_tag_names = get_source_tag_names(source)
source_track_id = metadata.get("source_track_id")
source_artist_id = metadata.get("source_artist_id")
source_album_id = metadata.get("source_album_id")
if cfg.get(f"{source}.embed_tags", True) is not False:
if source_tag_names.get("track") and source_track_id:
source_ids[source_tag_names["track"]] = source_track_id
if source_tag_names.get("artist") and source_artist_id:
source_ids[source_tag_names["artist"]] = source_artist_id
if source_tag_names.get("album") and source_album_id:
source_ids[source_tag_names["album"]] = source_album_id
if not source_ids:
if cfg.get("spotify.embed_tags", True) is not False:
if metadata.get("spotify_track_id"):
source_ids["SPOTIFY_TRACK_ID"] = metadata["spotify_track_id"]
if metadata.get("spotify_artist_id"):
source_ids["SPOTIFY_ARTIST_ID"] = metadata["spotify_artist_id"]
if metadata.get("spotify_album_id"):
source_ids["SPOTIFY_ALBUM_ID"] = metadata["spotify_album_id"]
if cfg.get("itunes.embed_tags", True) is not False:
if metadata.get("itunes_track_id"):
source_ids["ITUNES_TRACK_ID"] = metadata["itunes_track_id"]
if metadata.get("itunes_artist_id"):
source_ids["ITUNES_ARTIST_ID"] = metadata["itunes_artist_id"]
if metadata.get("itunes_album_id"):
source_ids["ITUNES_ALBUM_ID"] = metadata["itunes_album_id"]
return source_ids
def _process_musicbrainz_source(pp: dict, metadata: dict, cfg, runtime, track_title: str, artist_name: str) -> None:
if cfg.get("musicbrainz.embed_tags", True) is False:
return
if not track_title or not artist_name:
return
mb_worker = getattr(runtime, "mb_worker", None)
mb_service = mb_worker.mb_service if mb_worker else None
if not mb_service:
return
result = _call_source_lookup("MusicBrainz recording", mb_service.match_recording, track_title, artist_name)
if result and result.get("mbid"):
pp["recording_mbid"] = result["mbid"]
pp["id_tags"]["MUSICBRAINZ_RECORDING_ID"] = pp["recording_mbid"]
details = _call_source_lookup(
"MusicBrainz recording details",
mb_service.mb_client.get_recording,
pp["recording_mbid"],
includes=["isrcs", "genres"],
)
if details:
isrcs = details.get("isrcs", [])
if isrcs:
pp["isrc"] = isrcs[0]
pp["mb_genres"] = [g["name"] for g in sorted(details.get("genres", []), key=lambda x: x.get("count", 0), reverse=True)]
track_artist_name = metadata.get("artist", "") or artist_name
if ", " in track_artist_name:
track_artist_name = track_artist_name.split(", ")[0]
artist_result = _call_source_lookup("MusicBrainz artist", mb_service.match_artist, track_artist_name)
if artist_result and artist_result.get("mbid"):
pp["artist_mbid"] = artist_result["mbid"]
pp["id_tags"]["MUSICBRAINZ_ARTIST_ID"] = pp["artist_mbid"]
album_name_for_mb = metadata.get("album", "")
if album_name_for_mb:
artist_key = (pp.get("batch_artist_name") or artist_name).lower().strip()
normalized_album_key = normalize_album_cache_key(album_name_for_mb)
rc_key_norm = (normalized_album_key, artist_key)
rc_key_exact = (album_name_for_mb.lower().strip(), artist_key)
release_mbid = None
with mb_release_cache_lock:
cached = _bounded_cache_get(mb_release_cache, rc_key_norm)
if cached is None:
cached = _bounded_cache_get(mb_release_cache, rc_key_exact)
if cached:
release_mbid = cached
else:
# Persistent cache check BEFORE the live MB lookup. If a
# previous SoulSync run already resolved this album's
# release MBID, reuse it — guarantees every track of the
# same album gets the SAME MUSICBRAINZ_ALBUMID tag, even
# across server restarts and after the in-memory bounded
# cache evicts the entry. Strictly additive: any failure
# in the persistent lookup falls through to the live MB
# query exactly as today.
try:
from core.metadata import album_mbid_cache as _persisted_cache
persisted = _persisted_cache.lookup(normalized_album_key, artist_key)
except Exception:
persisted = None
if persisted:
release_mbid = persisted
else:
rc_result = _call_source_lookup("MusicBrainz release", mb_service.match_release, album_name_for_mb, artist_name)
if rc_result and rc_result.get("mbid"):
release_mbid = rc_result["mbid"]
if release_mbid:
_bounded_cache_set(mb_release_cache, rc_key_norm, release_mbid, _MB_RELEASE_CACHE_MAX_ENTRIES)
_bounded_cache_set(mb_release_cache, rc_key_exact, release_mbid, _MB_RELEASE_CACHE_MAX_ENTRIES)
# Also persist for future SoulSync runs. Defensive
# try/except so a DB write failure can't block the
# in-memory store + tag write that follow.
try:
from core.metadata import album_mbid_cache as _persisted_cache
_persisted_cache.record(normalized_album_key, artist_key, release_mbid)
except Exception as e:
logger.debug("MBID cache persist failed: %s", e)
pp["release_mbid"] = release_mbid or ""
if pp["release_mbid"]:
pp["id_tags"]["MUSICBRAINZ_RELEASE_ID"] = pp["release_mbid"]
if pp["release_mbid"]:
with mb_release_detail_cache_lock:
release_detail = _bounded_cache_get(mb_release_detail_cache, pp["release_mbid"])
if release_detail is None:
release_detail = _call_source_lookup(
"MusicBrainz release details",
mb_service.mb_client.get_release,
pp["release_mbid"],
includes=["release-groups", "labels", "media", "artist-credits", "recordings", "genres"],
) or {}
with mb_release_detail_cache_lock:
_bounded_cache_set(mb_release_detail_cache, pp["release_mbid"], release_detail, _MB_RELEASE_DETAIL_CACHE_MAX_ENTRIES)
if release_detail:
rg = release_detail.get("release-group", {})
if rg.get("id"):
pp["id_tags"]["MUSICBRAINZ_RELEASEGROUPID"] = rg["id"]
ac = release_detail.get("artist-credit", [])
if ac and isinstance(ac[0], dict):
aa = ac[0].get("artist", {})
if aa.get("id"):
pp["id_tags"]["MUSICBRAINZ_ALBUMARTISTID"] = aa["id"]
if rg.get("primary-type"):
pp["id_tags"]["RELEASETYPE"] = rg["primary-type"]
if rg.get("first-release-date"):
pp["id_tags"]["ORIGINALDATE"] = rg["first-release-date"]
if not pp["release_year"] and len(rg["first-release-date"]) >= 4:
year = rg["first-release-date"][:4]
if year.isdigit():
pp["release_year"] = year
if release_detail.get("status"):
pp["id_tags"]["RELEASESTATUS"] = release_detail["status"]
if release_detail.get("country"):
pp["id_tags"]["RELEASECOUNTRY"] = release_detail["country"]
if release_detail.get("barcode"):
pp["id_tags"]["BARCODE"] = release_detail["barcode"]
media_list = release_detail.get("media", [])
if media_list:
fmt = media_list[0].get("format", "")
if fmt:
pp["id_tags"]["MEDIA"] = fmt
pp["id_tags"]["TOTALDISCS"] = str(len(media_list))
label_info = release_detail.get("label-info", [])
if label_info and isinstance(label_info[0], dict):
cat = label_info[0].get("catalog-number", "")
if cat:
pp["id_tags"]["CATALOGNUMBER"] = cat
text_rep = release_detail.get("text-representation", {})
if isinstance(text_rep, dict) and text_rep.get("script"):
pp["id_tags"]["SCRIPT"] = text_rep["script"]
if release_detail.get("asin"):
pp["id_tags"]["ASIN"] = release_detail["asin"]
track_num = metadata.get("track_number")
disc_num = metadata.get("disc_number") or 1
if track_num and media_list:
try:
track_num_int = int(track_num)
disc_num_int = int(disc_num)
for medium in media_list:
if medium.get("position", 1) == disc_num_int:
for mtrack in (medium.get("tracks") or medium.get("track-list", [])):
if mtrack.get("position") == track_num_int:
if mtrack.get("id"):
pp["id_tags"]["MUSICBRAINZ_RELEASETRACKID"] = mtrack["id"]
release_recording = mtrack.get("recording", {})
if release_recording.get("id"):
pp["recording_mbid"] = release_recording["id"]
pp["id_tags"]["MUSICBRAINZ_RECORDING_ID"] = release_recording["id"]
break
break
except (ValueError, TypeError):
pass
# Genre fallback chain: most MusicBrainz recordings don't carry genres at
# the track level, but releases and artists usually do. If the recording
# came back empty, try the release; if that's empty too, fetch the artist
# with `includes=['genres']` and use that.
_release_detail_for_genres = locals().get("release_detail")
if not pp["mb_genres"] and _release_detail_for_genres:
pp["mb_genres"] = [
g["name"] for g in sorted(
_release_detail_for_genres.get("genres", []), key=lambda x: x.get("count", 0), reverse=True,
)
]
if not pp["mb_genres"] and pp.get("artist_mbid"):
artist_detail = _call_source_lookup(
"MusicBrainz artist details",
mb_service.mb_client.get_artist,
pp["artist_mbid"],
includes=["genres"],
)
if artist_detail:
pp["mb_genres"] = [
g["name"] for g in sorted(
artist_detail.get("genres", []), key=lambda x: x.get("count", 0), reverse=True,
)
]
def _process_deezer_source(pp: dict, metadata: dict, cfg, runtime, track_title: str, artist_name: str) -> None:
if cfg.get("deezer.embed_tags", True) is False:
return
if not track_title or not artist_name:
return
deezer_worker = getattr(runtime, "deezer_worker", None)
dz_client = deezer_worker.client if deezer_worker else None
if not dz_client:
return
dz_result = _call_source_lookup("Deezer track", dz_client.search_track, artist_name, track_title)
if dz_result and _names_match(dz_result.get("title", ""), track_title) and _names_match(dz_result.get("artist", {}).get("name", ""), artist_name):
dz_track_id = dz_result["id"]
pp["id_tags"]["DEEZER_TRACK_ID"] = str(dz_track_id)
dz_artist_id = dz_result.get("artist", {}).get("id")
if dz_artist_id:
pp["id_tags"]["DEEZER_ARTIST_ID"] = str(dz_artist_id)
dz_details = _call_source_lookup("Deezer track details", dz_client.get_track_details, dz_track_id)
if dz_details:
bpm_val = dz_details.get("bpm")
if bpm_val and bpm_val > 0:
pp["deezer_bpm"] = bpm_val
dz_isrc = dz_details.get("isrc")
if dz_isrc:
pp["deezer_isrc"] = dz_isrc
if not pp["release_year"]:
dz_album = dz_result.get("album", {})
dz_release = (dz_album.get("release_date", "") if isinstance(dz_album, dict) else "") or ""
if len(dz_release) >= 4 and dz_release[:4].isdigit():
pp["release_year"] = dz_release[:4]
def _process_audiodb_source(pp: dict, metadata: dict, cfg, runtime, track_title: str, artist_name: str) -> None:
if cfg.get("audiodb.embed_tags", True) is False:
return
if not track_title or not artist_name:
return
audiodb_worker = getattr(runtime, "audiodb_worker", None)
adb_client = audiodb_worker.client if audiodb_worker else None
if not adb_client:
return
adb_result = _call_source_lookup("AudioDB track", adb_client.search_track, artist_name, track_title)
if adb_result and _names_match(adb_result.get("strTrack", ""), track_title) and _names_match(adb_result.get("strArtist", ""), artist_name):
adb_track_id = adb_result.get("idTrack")
if adb_track_id:
pp["id_tags"]["AUDIODB_TRACK_ID"] = str(adb_track_id)
adb_mb_track = adb_result.get("strMusicBrainzID")
if adb_mb_track and "MUSICBRAINZ_RECORDING_ID" not in pp["id_tags"]:
pp["id_tags"]["MUSICBRAINZ_RECORDING_ID"] = adb_mb_track
pp["recording_mbid"] = adb_mb_track
adb_mb_artist = adb_result.get("strMusicBrainzArtistID")
if adb_mb_artist and "MUSICBRAINZ_ARTIST_ID" not in pp["id_tags"]:
pp["id_tags"]["MUSICBRAINZ_ARTIST_ID"] = adb_mb_artist
pp["artist_mbid"] = adb_mb_artist
pp["audiodb_mood"] = adb_result.get("strMood") or None
pp["audiodb_style"] = adb_result.get("strStyle") or None
pp["audiodb_genre"] = adb_result.get("strGenre") or None
def _process_tidal_source(pp: dict, metadata: dict, cfg, runtime, track_title: str, artist_name: str) -> None:
if cfg.get("tidal.embed_tags", True) is False:
return
if not track_title or not artist_name:
return
tidal_client = getattr(runtime, "tidal_client", None)
if not (tidal_client and tidal_client.is_authenticated()):
return
td_result = _call_source_lookup("Tidal track", tidal_client.search_track, artist_name, track_title)
if td_result and _names_match(td_result.get("title", ""), track_title):
td_track_id = td_result.get("id")
if td_track_id:
pp["id_tags"]["TIDAL_TRACK_ID"] = str(td_track_id)
td_artist = td_result.get("artist", {})
if isinstance(td_artist, dict) and td_artist.get("id"):
pp["id_tags"]["TIDAL_ARTIST_ID"] = str(td_artist["id"])
if td_track_id:
td_details = _call_source_lookup("Tidal track details", tidal_client.get_track, str(td_track_id))
if td_details:
pp["tidal_isrc"] = td_details.get("isrc")
td_bpm = td_details.get("bpm")
if td_bpm and td_bpm > 0:
pp["tidal_bpm"] = td_bpm
td_copyright = td_details.get("copyright")
if isinstance(td_copyright, dict):
td_copyright = td_copyright.get("text", td_copyright.get("name", ""))
pp["tidal_copyright"] = td_copyright or None
if not pp["release_year"]:
td_album = td_result.get("album", {})
td_release = ""
if isinstance(td_album, dict):
td_release = str(td_album.get("release_date", "") or td_album.get("releaseDate", "") or "")
if len(td_release) >= 4 and td_release[:4].isdigit():
pp["release_year"] = td_release[:4]
def _process_hifi_source(pp: dict, metadata: dict, cfg, runtime, track_title: str, artist_name: str) -> None:
if cfg.get("hifi.embed_tags", True) is False:
return
if not track_title or not artist_name:
return
hifi_client = getattr(runtime, "hifi_client", None)
if not hifi_client:
return
hifi_results = _call_source_lookup("HiFi track", hifi_client.search_tracks, track_title, artist_name)
if hifi_results and len(hifi_results) > 0:
hifi_track = hifi_results[0]
if _names_match(hifi_track.get("title", ""), track_title):
hifi_track_id = hifi_track.get("id")
if hifi_track_id:
pp["id_tags"]["HIFI_TRACK_ID"] = str(hifi_track_id)
hifi_artist_id = hifi_track.get("artist_id")
if hifi_artist_id:
pp["id_tags"]["HIFI_ARTIST_ID"] = str(hifi_artist_id)
if hifi_track_id:
hifi_details = _call_source_lookup("HiFi track details", hifi_client.get_track_info, hifi_track_id)
if hifi_details:
hifi_isrc = hifi_details.get("isrc")
if hifi_isrc:
pp["hifi_isrc"] = hifi_isrc
hifi_bpm = hifi_details.get("bpm")
if hifi_bpm and hifi_bpm > 0:
pp["hifi_bpm"] = hifi_bpm
hifi_copyright = hifi_details.get("copyright")
if hifi_copyright:
pp["hifi_copyright"] = hifi_copyright
if not pp["release_year"]:
hifi_album_id = hifi_track.get("album_id")
if hifi_album_id:
hifi_album = _call_source_lookup("HiFi album", hifi_client.get_album, hifi_album_id)
if hifi_album:
hifi_release = str(hifi_album.get("release_date", "") or "")
if len(hifi_release) >= 4 and hifi_release[:4].isdigit():
pp["release_year"] = hifi_release[:4]
def _process_qobuz_source(pp: dict, metadata: dict, cfg, runtime, track_title: str, artist_name: str) -> None:
if cfg.get("qobuz.embed_tags", True) is False:
return
if not track_title or not artist_name:
return
qobuz_worker = getattr(runtime, "qobuz_enrichment_worker", None)
qz_client = qobuz_worker.client if qobuz_worker else None
if not (qz_client and qz_client.is_authenticated()):
return
qz_result = _call_source_lookup("Qobuz track", qz_client.search_track, artist_name, track_title)
if qz_result:
qz_performer = qz_result.get("performer") or {}
if not isinstance(qz_performer, dict):
qz_performer = {}
qz_artist_name = qz_performer.get("name", "")
if _names_match(qz_result.get("title", ""), track_title) and _names_match(qz_artist_name, artist_name):
qz_track_id = qz_result.get("id")
if qz_track_id:
pp["id_tags"]["QOBUZ_TRACK_ID"] = str(qz_track_id)
if qz_performer.get("id"):
pp["id_tags"]["QOBUZ_ARTIST_ID"] = str(qz_performer["id"])
qz_isrc = qz_result.get("isrc")
if isinstance(qz_isrc, dict):
qz_isrc = qz_isrc.get("value", qz_isrc.get("id", ""))
if qz_isrc:
pp["qobuz_isrc"] = qz_isrc
qz_copyright = qz_result.get("copyright")
if isinstance(qz_copyright, dict):
qz_copyright = qz_copyright.get("text", qz_copyright.get("name", ""))
if isinstance(qz_copyright, str):
pp["qobuz_copyright"] = qz_copyright
qz_album = qz_result.get("album", {})
if isinstance(qz_album, dict):
qz_label_info = qz_album.get("label", {})
if isinstance(qz_label_info, dict) and qz_label_info.get("name"):
pp["qobuz_label"] = qz_label_info["name"]
if not pp["release_year"]:
qz_release = str(qz_album.get("release_date_original", "") or "")
if not qz_release:
qz_ts = qz_album.get("released_at")
if qz_ts and isinstance(qz_ts, (int, float)) and qz_ts > 0:
import datetime as _dt
qz_release = str(_dt.datetime.utcfromtimestamp(qz_ts).year)
if len(qz_release) >= 4 and qz_release[:4].isdigit():
pp["release_year"] = qz_release[:4]
def _process_lastfm_source(pp: dict, metadata: dict, cfg, runtime, track_title: str, artist_name: str) -> None:
if cfg.get("lastfm.embed_tags", True) is False:
return
if not track_title or not artist_name:
return
lastfm_worker = getattr(runtime, "lastfm_worker", None)
lf_client = lastfm_worker.client if lastfm_worker else None
if not lf_client:
return
lf_result = _call_source_lookup("Last.fm track", lf_client.get_track_info, artist_name, track_title)
if lf_result:
lf_url = lf_result.get("url")
if lf_url:
pp["lastfm_url"] = lf_url
lf_toptags = lf_result.get("toptags", {})
if isinstance(lf_toptags, dict):
tag_list = lf_toptags.get("tag", [])
if isinstance(tag_list, list):
pp["lastfm_tags"] = [tag.get("name", "") for tag in tag_list if isinstance(tag, dict) and tag.get("name")]
elif isinstance(tag_list, dict) and tag_list.get("name"):
pp["lastfm_tags"] = [tag_list["name"]]
def _process_genius_source(pp: dict, metadata: dict, cfg, runtime, track_title: str, artist_name: str) -> None:
if cfg.get("genius.embed_tags", True) is False:
return
if not track_title or not artist_name:
return
import core.genius_client as _genius_module
if time.time() < _genius_module._rate_limit_until:
logger.info("Genius rate-limited, skipping (non-blocking)")
return
genius_worker = getattr(runtime, "genius_worker", None)
g_client = genius_worker.client if genius_worker else None
if not g_client:
return
g_result = _call_source_lookup("Genius track", g_client.search_song, artist_name, track_title)
if g_result:
g_id = g_result.get("id")
if g_id:
pp["id_tags"]["GENIUS_TRACK_ID"] = str(g_id)
g_url = g_result.get("url")
if g_url:
pp["genius_url"] = g_url
def _process_source_enrichment(source_name: str, pp: dict, metadata: dict, cfg, runtime, track_title: str, artist_name: str) -> None:
if source_name == "musicbrainz":
_process_musicbrainz_source(pp, metadata, cfg, runtime, track_title, artist_name)
elif source_name == "deezer":
_process_deezer_source(pp, metadata, cfg, runtime, track_title, artist_name)
elif source_name == "audiodb":
_process_audiodb_source(pp, metadata, cfg, runtime, track_title, artist_name)
elif source_name == "tidal":
_process_tidal_source(pp, metadata, cfg, runtime, track_title, artist_name)
elif source_name == "hifi":
_process_hifi_source(pp, metadata, cfg, runtime, track_title, artist_name)
elif source_name == "qobuz":
_process_qobuz_source(pp, metadata, cfg, runtime, track_title, artist_name)
elif source_name == "lastfm":
_process_lastfm_source(pp, metadata, cfg, runtime, track_title, artist_name)
elif source_name == "genius":
_process_genius_source(pp, metadata, cfg, runtime, track_title, artist_name)
def _write_embedded_metadata(audio_file, metadata: dict, pp: dict, cfg, symbols):
filtered_tags: Dict[str, str] = {}
for tag_name, value in pp["id_tags"].items():
config_path = SOURCE_TAG_CONFIG.get(tag_name)
if config_path and not _tag_enabled(cfg, config_path):
continue
filtered_tags[tag_name] = value
written = []
release_year = pp["release_year"]
if isinstance(audio_file.tags, symbols.ID3):
for tag_name, value in filtered_tags.items():
spec = ID3_TAG_MAP.get(tag_name)
if spec:
frame_type, desc = spec
if frame_type == "UFID":
audio_file.tags.add(symbols.UFID(owner=desc, data=str(value).encode("ascii")))
written.append(f"UFID:{desc}")
elif frame_type == "TDOR":
audio_file.tags.add(symbols.TDOR(encoding=3, text=[value]))
written.append("TDOR")
elif frame_type == "TMED":
audio_file.tags.add(symbols.TMED(encoding=3, text=[value]))
written.append("TMED")
else:
audio_file.tags.add(symbols.TXXX(encoding=3, desc=desc, text=[value]))
written.append(f"TXXX:{desc}")
else:
audio_file.tags.add(symbols.TXXX(encoding=3, desc=tag_name, text=[str(value)]))
written.append(f"TXXX:{tag_name}")
elif isinstance(audio_file, symbols.MP4):
for tag_name, value in filtered_tags.items():
key = f"----:com.apple.iTunes:{MP4_TAG_MAP.get(tag_name, tag_name)}"
audio_file[key] = [symbols.MP4FreeForm(str(value).encode("utf-8"))]
written.append(key)
elif is_vorbis_like(audio_file, symbols):
for tag_name, value in filtered_tags.items():
audio_file[VORBIS_TAG_MAP.get(tag_name, tag_name)] = [str(value)]
written.append(VORBIS_TAG_MAP.get(tag_name, tag_name))
if written:
logger.info("Embedded IDs: %s", ", ".join(written))
if release_year and not metadata.get("date"):
metadata["date"] = release_year
if isinstance(audio_file.tags, symbols.ID3):
audio_file.tags.add(symbols.TDRC(encoding=3, text=[release_year]))
elif is_vorbis_like(audio_file, symbols):
audio_file["date"] = [release_year]
elif isinstance(audio_file, symbols.MP4):
audio_file["\xa9day"] = [release_year]
logger.info("Date tag: %s", release_year)
bpm_candidates = []
if pp["deezer_bpm"] and pp["deezer_bpm"] > 0 and _tag_enabled(cfg, "deezer.tags.bpm"):
bpm_candidates.append(("Deezer", pp["deezer_bpm"]))
if pp["tidal_bpm"] and pp["tidal_bpm"] > 0 and _tag_enabled(cfg, "tidal.tags.bpm"):
bpm_candidates.append(("Tidal", pp["tidal_bpm"]))
if pp["hifi_bpm"] and pp["hifi_bpm"] > 0 and _tag_enabled(cfg, "hifi.tags.bpm"):
bpm_candidates.append(("HiFi", pp["hifi_bpm"]))
if bpm_candidates:
bpm_source, bpm_val = bpm_candidates[0]
bpm_int = int(bpm_val)
if isinstance(audio_file.tags, symbols.ID3):
audio_file.tags.add(symbols.TBPM(encoding=3, text=[str(bpm_int)]))
elif is_vorbis_like(audio_file, symbols):
audio_file["BPM"] = [str(bpm_int)]
elif isinstance(audio_file, symbols.MP4):
audio_file["tmpo"] = [bpm_int]
logger.info("BPM (%s): %s", bpm_source, bpm_int)
if _tag_enabled(cfg, "audiodb.tags.mood") and pp["audiodb_mood"]:
if isinstance(audio_file.tags, symbols.ID3):
audio_file.tags.add(symbols.TXXX(encoding=3, desc="MOOD", text=[pp["audiodb_mood"]]))
elif is_vorbis_like(audio_file, symbols):
audio_file["MOOD"] = [pp["audiodb_mood"]]
elif isinstance(audio_file, symbols.MP4):
audio_file["----:com.apple.iTunes:MOOD"] = [symbols.MP4FreeForm(pp["audiodb_mood"].encode("utf-8"))]
if _tag_enabled(cfg, "audiodb.tags.style") and pp["audiodb_style"]:
if isinstance(audio_file.tags, symbols.ID3):
audio_file.tags.add(symbols.TXXX(encoding=3, desc="STYLE", text=[pp["audiodb_style"]]))
elif is_vorbis_like(audio_file, symbols):
audio_file["STYLE"] = [pp["audiodb_style"]]
elif isinstance(audio_file, symbols.MP4):
audio_file["----:com.apple.iTunes:STYLE"] = [symbols.MP4FreeForm(pp["audiodb_style"].encode("utf-8"))]
if _tag_enabled(cfg, "metadata_enhancement.tags.genre_merge"):
enrichment_genres = []
if _tag_enabled(cfg, "musicbrainz.tags.genres"):
enrichment_genres += pp["mb_genres"]
if pp["audiodb_genre"] and _tag_enabled(cfg, "audiodb.tags.genre"):
enrichment_genres.append(pp["audiodb_genre"])
if _tag_enabled(cfg, "lastfm.tags.genres"):
enrichment_genres += pp["lastfm_tags"]
if enrichment_genres:
from core.genre_filter import filter_genres as _filter_genres
enrichment_genres = _filter_genres(enrichment_genres, cfg)
source_genres = [g.strip() for g in str(metadata.get("genre", "")).split(",") if g.strip()]
seen = set()
merged = []
for genre in source_genres + enrichment_genres:
key = genre.strip().lower()
if key and key not in seen:
seen.add(key)
merged.append(genre.strip().title())
if len(merged) >= 5:
break
if merged:
genre_string = ", ".join(merged)
if isinstance(audio_file.tags, symbols.ID3):
audio_file.tags.add(symbols.TCON(encoding=3, text=[genre_string]))
elif is_vorbis_like(audio_file, symbols):
audio_file["GENRE"] = [genre_string]
elif isinstance(audio_file, symbols.MP4):
audio_file["\xa9gen"] = [genre_string]
logger.info("Genres merged: %s", genre_string)
isrc_candidates = []
if pp["isrc"] and _tag_enabled(cfg, "musicbrainz.tags.isrc"):
isrc_candidates.append(("MusicBrainz", pp["isrc"]))
if pp["deezer_isrc"] and _tag_enabled(cfg, "deezer.tags.isrc"):
isrc_candidates.append(("Deezer", pp["deezer_isrc"]))
if pp["tidal_isrc"] and _tag_enabled(cfg, "tidal.tags.isrc"):
isrc_candidates.append(("Tidal", pp["tidal_isrc"]))
if pp["hifi_isrc"] and _tag_enabled(cfg, "hifi.tags.isrc"):
isrc_candidates.append(("HiFi", pp["hifi_isrc"]))
if pp["qobuz_isrc"] and _tag_enabled(cfg, "qobuz.tags.isrc"):
isrc_candidates.append(("Qobuz", pp["qobuz_isrc"]))
if isrc_candidates:
isrc_source, final_isrc = isrc_candidates[0]
if isinstance(audio_file.tags, symbols.ID3):
audio_file.tags.add(symbols.TSRC(encoding=3, text=[final_isrc]))
elif is_vorbis_like(audio_file, symbols):
audio_file["ISRC"] = [final_isrc]
elif isinstance(audio_file, symbols.MP4):
audio_file["----:com.apple.iTunes:ISRC"] = [symbols.MP4FreeForm(final_isrc.encode("utf-8"))]
logger.info("ISRC (%s): %s", isrc_source, final_isrc)
copyright_candidates = []
if pp["tidal_copyright"] and _tag_enabled(cfg, "tidal.tags.copyright"):
copyright_candidates.append(("Tidal", pp["tidal_copyright"]))
if pp["qobuz_copyright"] and _tag_enabled(cfg, "qobuz.tags.copyright"):
copyright_candidates.append(("Qobuz", pp["qobuz_copyright"]))
if pp["hifi_copyright"] and _tag_enabled(cfg, "hifi.tags.copyright"):
copyright_candidates.append(("HiFi", pp["hifi_copyright"]))
if copyright_candidates:
copyright_source, final_copyright = copyright_candidates[0]
if isinstance(audio_file.tags, symbols.ID3):
audio_file.tags.add(symbols.TCOP(encoding=3, text=[final_copyright]))
elif is_vorbis_like(audio_file, symbols):
audio_file["COPYRIGHT"] = [final_copyright]
elif isinstance(audio_file, symbols.MP4):
audio_file["cprt"] = [final_copyright]
logger.info("Copyright (%s): %s", copyright_source, final_copyright[:60])
if _tag_enabled(cfg, "qobuz.tags.label") and pp["qobuz_label"]:
if isinstance(audio_file.tags, symbols.ID3):
audio_file.tags.add(symbols.TPUB(encoding=3, text=[pp["qobuz_label"]]))
elif is_vorbis_like(audio_file, symbols):
audio_file["LABEL"] = [pp["qobuz_label"]]
elif isinstance(audio_file, symbols.MP4):
audio_file["----:com.apple.iTunes:LABEL"] = [symbols.MP4FreeForm(pp["qobuz_label"].encode("utf-8"))]
if _tag_enabled(cfg, "lastfm.tags.url") and pp["lastfm_url"]:
if isinstance(audio_file.tags, symbols.ID3):
audio_file.tags.add(symbols.TXXX(encoding=3, desc="LASTFM_URL", text=[pp["lastfm_url"]]))
elif is_vorbis_like(audio_file, symbols):
audio_file["LASTFM_URL"] = [pp["lastfm_url"]]
elif isinstance(audio_file, symbols.MP4):
audio_file["----:com.apple.iTunes:LASTFM_URL"] = [symbols.MP4FreeForm(pp["lastfm_url"].encode("utf-8"))]
if _tag_enabled(cfg, "genius.tags.url") and pp["genius_url"]:
if isinstance(audio_file.tags, symbols.ID3):
audio_file.tags.add(symbols.TXXX(encoding=3, desc="GENIUS_URL", text=[pp["genius_url"]]))
elif is_vorbis_like(audio_file, symbols):
audio_file["GENIUS_URL"] = [pp["genius_url"]]
elif isinstance(audio_file, symbols.MP4):
audio_file["----:com.apple.iTunes:GENIUS_URL"] = [symbols.MP4FreeForm(pp["genius_url"].encode("utf-8"))]
return release_year
def _update_album_year_in_database(db, metadata: dict, release_year) -> None:
if db is None:
return
try:
album_name_for_db = metadata.get("album", "")
album_artist_for_db = metadata.get("album_artist", "") or metadata.get("artist", "")
if album_name_for_db and album_artist_for_db:
conn = db._get_connection()
try:
cursor = conn.cursor()
cursor.execute(
"""
UPDATE albums SET year = ?
WHERE (year IS NULL OR year = 0)
AND id IN (
SELECT al.id FROM albums al
JOIN artists ar ON ar.id = al.artist_id
WHERE LOWER(al.title) = LOWER(?) AND LOWER(ar.name) = LOWER(?)
)
""",
(int(release_year), album_name_for_db, album_artist_for_db),
)
if cursor.rowcount > 0:
conn.commit()
logger.info("Updated album year to %s in database", release_year)
else:
conn.rollback()
finally:
conn.close()
except Exception as exc:
logger.error("Could not update album year in DB: %s", exc)
def extract_source_metadata(context: dict, artist: dict, album_info: dict) -> dict:
if album_info is None:
album_info = {}
cfg = get_config_manager()
context = normalize_import_context(context)
original_search = get_import_original_search(context)
album_ctx = get_import_context_album(context)
track_info = get_import_track_info(context)
source = get_import_source(context)
source_ids = get_import_source_ids(context)
artist_dict = artist if isinstance(artist, dict) else {
"name": extract_artist_name(artist),
"id": getattr(artist, "id", ""),
"genres": list(getattr(artist, "genres", []) or []),
}
metadata: Dict[str, Any] = {
"source": source,
"source_track_id": source_ids["track_id"],
"source_artist_id": source_ids["artist_id"],
"source_album_id": source_ids["album_id"],
}
metadata["title"] = get_import_clean_title(context, album_info=album_info, default=original_search.get("title", ""))
if original_search.get("clean_title"):
logger.info("Metadata: Using clean title: '%s'", metadata["title"])
elif album_info.get("clean_track_name"):
logger.info("Metadata: Using album info clean name: '%s'", metadata["title"])
else:
logger.warning("Metadata: Using original title as fallback: '%s'", metadata["title"])
# Resolve canonical artists list. Soulseek matched-download contexts
# only carry `original_search.artist` (singular string) — the full
# contributors list lives on `track_info` (the matched Spotify/etc
# track object). Deezer-direct contexts populate `original_search.artists`
# directly. Pure helper handles all three shapes.
all_artists = resolve_track_artists(original_search, track_info, artist_dict)
if all_artists:
# Deezer upgrade path: Deezer's `/search` endpoint only returns
# the primary artist for each track. The full contributors
# array (feat., remix collaborators, producers credited as
# artists) lives on `/track/<id>` and gets parsed by
# `_build_enhanced_track`. Without this upgrade Deezer-sourced
# tracks never get multi-artist tags even with the right
# settings on. One extra API call per Deezer-sourced track,
# only when the search response had a single artist (so it's
# a no-op when search already returned multiple).
if (source == "deezer" and len(all_artists) == 1
and source_ids.get("track_id")):
try:
from core.metadata import get_deezer_client
deezer = get_deezer_client()
if deezer:
full = deezer.get_track_details(str(source_ids["track_id"]))
if full and isinstance(full.get("artists"), list) and len(full["artists"]) > 1:
upgraded = [a for a in full["artists"] if a]
if upgraded:
logger.info(
"Metadata: Deezer contributors upgrade — search returned "
"%d artist, /track/<id> returned %d (%s)",
len(all_artists), len(upgraded), upgraded,
)
all_artists = upgraded
except Exception as e:
logger.debug("Deezer contributors upgrade failed: %s", e)
# Store the multi-artist list so the enrichment writer can emit
# proper multi-value ARTIST tags (TPE1 multi-value for ID3,
# "artists" key for Vorbis) when `write_multi_artist` is on.
# Without this assignment the field was always empty and the
# multi-artist write path silently no-op'd.
metadata["_artists_list"] = list(all_artists)
# `feat_in_title` (when true): pull featured artists out of the
# ARTIST tag entirely and append "(feat. X, Y)" to the title.
# Matches Picard / Beets convention and lets media servers
# group by primary artist instead of treating "A, B & C" as a
# distinct artist string.
# `artist_separator`: when feat_in_title is off (or there's
# only one artist) and write_multi_artist is on, this is the
# delimiter used to join all artists into the single ARTIST
# string. Picard defaults to "; " — we default to ", " to
# preserve historical behavior for users who haven't touched
# the setting.
feat_in_title = cfg.get("metadata_enhancement.tags.feat_in_title", False)
artist_separator = cfg.get("metadata_enhancement.tags.artist_separator", ", ")
if feat_in_title and len(all_artists) > 1:
metadata["artist"] = all_artists[0]
featured = all_artists[1:]
existing_title = metadata.get("title", "") or ""
# Don't double-append if the title already carries the
# featured artists. Source titles vary: "(feat. X)",
# "(featuring X)", "(ft. X)", "ft. X" (no parens), "[feat X]"
# (no period, brackets), etc. Word-boundary regex catches
# `feat`, `feat.`, `featuring`, `ft`, `ft.` regardless of
# surrounding punctuation. Case-insensitive.
import re as _feat_re
already_has_feat = bool(_feat_re.search(
r'\b(?:feat|feat\.|featuring|ft|ft\.)\b',
existing_title,
_feat_re.IGNORECASE,
))
if existing_title and not already_has_feat:
metadata["title"] = f"{existing_title} (feat. {', '.join(featured)})"
logger.info(
"Metadata: feat_in_title — primary='%s', featured=%s, title='%s'",
metadata["artist"], featured, metadata["title"],
)
else:
metadata["artist"] = artist_separator.join(all_artists)
logger.info(
"Metadata: Using all artists joined with %r: '%s'",
artist_separator, metadata["artist"],
)
else:
metadata["artist"] = artist_dict.get("name", "") or get_import_clean_artist(context)
logger.info("Metadata: Using primary artist: '%s'", metadata["artist"])
raw_album_artist = artist_dict.get("name", "") or metadata["artist"]
track_info_ctx = track_info or {}
explicit_artist = track_info_ctx.get("_explicit_artist_context") if isinstance(track_info_ctx, dict) else None
album_artists_for_collab = None
if isinstance(explicit_artist, dict) and explicit_artist.get("name"):
raw_album_artist = explicit_artist["name"]
album_artists_for_collab = [explicit_artist]
elif isinstance(explicit_artist, str) and explicit_artist:
raw_album_artist = explicit_artist
album_artists_for_collab = [{"name": explicit_artist}]
elif album_ctx and isinstance(album_ctx, dict):
album_artists = album_ctx.get("artists", [])
if album_artists:
first_album_artist = album_artists[0]
if isinstance(first_album_artist, dict) and first_album_artist.get("name"):
raw_album_artist = first_album_artist["name"]
elif isinstance(first_album_artist, str) and first_album_artist:
raw_album_artist = first_album_artist
album_artists_for_collab = album_artists
collab_mode = cfg.get("file_organization.collab_artist_mode", "first")
if collab_mode == "first" and raw_album_artist:
context_artists = album_artists_for_collab or original_search.get("artists") or track_info_ctx.get("artists") or []
if len(context_artists) > 1:
first = context_artists[0]
raw_album_artist = first.get("name", first) if isinstance(first, dict) else str(first)
elif len(context_artists) == 1 and ("," in raw_album_artist or " & " in raw_album_artist):
artist_id = str(artist_dict.get("id", ""))
if source == "itunes" and artist_id.isdigit():
try:
itunes_client = get_itunes_client()
if itunes_client and hasattr(itunes_client, "resolve_primary_artist"):
resolved = itunes_client.resolve_primary_artist(artist_id)
if resolved and resolved != raw_album_artist:
raw_album_artist = resolved
except Exception as e:
logger.debug("itunes primary artist resolve failed: %s", e)
metadata["album_artist"] = raw_album_artist
if album_info.get("is_album"):
metadata["album"] = album_info.get("album_name", "Unknown Album")
metadata["track_number"] = album_info.get("track_number", 1)
metadata["total_tracks"] = album_ctx.get("total_tracks", 1) if album_ctx else 1
logger.info("[METADATA] Album track - track_number: %s, album: %s", metadata["track_number"], metadata["album"])
else:
if album_ctx and album_ctx.get("name"):
logger.info("[SAFEGUARD] Using album context name instead of track title for album metadata")
metadata["album"] = album_ctx["name"]
metadata["track_number"] = album_info.get("track_number", 1) if album_info else 1
metadata["total_tracks"] = album_ctx.get("total_tracks", 1)
else:
metadata["album"] = metadata["title"]
metadata["track_number"] = 1
metadata["total_tracks"] = 1
disc_num = original_search.get("disc_number")
if disc_num is None and album_info:
disc_num = album_info.get("disc_number")
metadata["disc_number"] = disc_num if disc_num is not None else 1
if album_ctx and album_ctx.get("release_date"):
release_date = _normalize_release_date_tag(album_ctx.get("release_date"))
if release_date:
metadata["date"] = release_date
genres = artist_dict.get("genres") or []
if genres:
from core.genre_filter import filter_genres
filtered = filter_genres(list(genres[:2]), cfg)
if filtered:
metadata["genre"] = ", ".join(filtered)
metadata["album_art_url"] = album_info.get("album_image_url") if album_info else None
if not metadata["album_art_url"] and album_ctx:
album_image = album_ctx.get("image_url")
if not album_image and album_ctx.get("images"):
first_image = album_ctx["images"][0]
album_image = first_image.get("url") if isinstance(first_image, dict) else None
metadata["album_art_url"] = album_image
logger.info(
"[Metadata Summary] title='%s' | artist='%s' | album_artist='%s' | album='%s' | date=%s | track=%s/%s | disc=%s",
metadata.get("title"),
metadata.get("artist"),
metadata.get("album_artist"),
metadata.get("album"),
metadata.get("date", ""),
metadata.get("track_number"),
metadata.get("total_tracks"),
metadata.get("disc_number"),
)
return metadata
def embed_source_ids(audio_file, metadata: dict, context: dict = None, runtime=None):
cfg = get_config_manager()
symbols = get_mutagen_symbols()
if not symbols:
return
try:
context = normalize_import_context(context)
source_ids = _collect_source_ids(metadata, cfg)
track_title = metadata.get("title", "")
artist_name = metadata.get("album_artist", "") or metadata.get("artist", "")
track_info = get_import_track_info(context)
explicit_artist = (track_info or {}).get("_explicit_artist_context") if isinstance(track_info, dict) else None
batch_artist_name = None
if isinstance(explicit_artist, dict) and explicit_artist.get("name"):
batch_artist_name = explicit_artist["name"]
elif isinstance(explicit_artist, str) and explicit_artist:
batch_artist_name = explicit_artist
pp = {
"id_tags": source_ids,
"track_title": track_title,
"artist_name": artist_name,
"batch_artist_name": batch_artist_name,
"metadata": metadata,
"recording_mbid": None,
"artist_mbid": None,
"release_mbid": "",
"mb_genres": [],
"isrc": None,
"deezer_bpm": None,
"deezer_isrc": None,
"tidal_bpm": None,
"hifi_bpm": None,
"hifi_copyright": None,
"audiodb_mood": None,
"audiodb_style": None,
"audiodb_genre": None,
"tidal_isrc": None,
"tidal_copyright": None,
"hifi_isrc": None,
"qobuz_isrc": None,
"qobuz_copyright": None,
"qobuz_label": None,
"lastfm_tags": [],
"lastfm_url": None,
"genius_url": None,
"release_year": None,
}
source_order = cfg.get("metadata_enhancement.post_process_order", None)
if not isinstance(source_order, list) or not source_order:
source_order = DEFAULT_SOURCE_ORDER
# If this download came from HiFi, use cached metadata from the download
# pipeline instead of re-searching the HiFi API.
original_search = get_import_original_search(context)
cached_meta = original_search.get("_source_metadata") or {}
if cached_meta.get("source") == "hifi":
if _tag_enabled(cfg, "hifi.embed_tags"):
if cfg.get("hifi.tags.track_id", True) and cached_meta.get("track_id"):
pp["id_tags"]["HIFI_TRACK_ID"] = str(cached_meta["track_id"])
if cfg.get("hifi.tags.artist_id", True) and cached_meta.get("artist_id"):
pp["id_tags"]["HIFI_ARTIST_ID"] = str(cached_meta["artist_id"])
if cfg.get("hifi.tags.isrc", True) and cached_meta.get("isrc"):
pp["hifi_isrc"] = cached_meta["isrc"]
if cfg.get("hifi.tags.bpm", True) and cached_meta.get("bpm"):
pp["hifi_bpm"] = cached_meta["bpm"]
if cfg.get("hifi.tags.copyright", True) and cached_meta.get("copyright"):
pp["hifi_copyright"] = cached_meta["copyright"]
source_order = [s for s in source_order if s != "hifi"]
# If this download came from Tidal, use cached metadata from the download
# pipeline instead of re-searching the Tidal API.
if cached_meta.get("source") == "tidal":
if _tag_enabled(cfg, "tidal.embed_tags"):
if cfg.get("tidal.tags.track_id", True) and cached_meta.get("track_id"):
pp["id_tags"]["TIDAL_TRACK_ID"] = str(cached_meta["track_id"])
if cfg.get("tidal.tags.artist_id", True) and cached_meta.get("artist_id"):
pp["id_tags"]["TIDAL_ARTIST_ID"] = str(cached_meta["artist_id"])
if cfg.get("tidal.tags.isrc", True) and cached_meta.get("isrc"):
pp["tidal_isrc"] = cached_meta["isrc"]
if cfg.get("tidal.tags.bpm", True) and cached_meta.get("bpm"):
pp["tidal_bpm"] = cached_meta["bpm"]
if cfg.get("tidal.tags.copyright", True) and cached_meta.get("copyright"):
pp["tidal_copyright"] = cached_meta["copyright"]
source_order = [s for s in source_order if s != "tidal"]
db = get_database()
for source_name in source_order:
_process_source_enrichment(source_name, pp, metadata, cfg, runtime, track_title, artist_name)
if not pp["id_tags"] and not pp["deezer_bpm"] and not pp["deezer_isrc"] and not pp["tidal_bpm"] and not pp["hifi_bpm"] and not pp["hifi_copyright"] and not pp["audiodb_mood"] and not pp["audiodb_style"]:
return
release_year = _write_embedded_metadata(audio_file, metadata, pp, cfg, symbols)
release_id = pp["release_mbid"]
if release_id:
metadata["musicbrainz_release_id"] = release_id
_update_album_year_in_database(db, metadata, release_year)
# Expose the final ID tag set + per-source values back to the
# import context so downstream side-effects (notably
# ``record_download_provenance``) can persist them to the
# ``track_downloads`` table without re-collecting. Without this,
# the watchlist scanner would have to wait for the async
# enrichment workers to backfill ``tracks.spotify_track_id`` etc.
# before recognizing freshly downloaded files.
if isinstance(context, dict):
try:
context["_embedded_id_tags"] = dict(pp.get("id_tags") or {})
isrc_value = (
pp.get("isrc") or pp.get("deezer_isrc") or pp.get("tidal_isrc")
or pp.get("hifi_isrc") or pp.get("qobuz_isrc")
)
if isrc_value:
context["_isrc"] = str(isrc_value)
except Exception as e:
logger.debug("context isrc copy failed: %s", e)
except Exception as exc:
logger.error("Error embedding source IDs (non-fatal): %s", exc)