|
|
|
|
@ -124,12 +124,14 @@ SOURCE_TAG_CONFIG = {
|
|
|
|
|
"AUDIODB_TRACK_ID": "audiodb.tags.track_id",
|
|
|
|
|
"TIDAL_TRACK_ID": "tidal.tags.track_id",
|
|
|
|
|
"TIDAL_ARTIST_ID": "tidal.tags.artist_id",
|
|
|
|
|
"HIFI_TRACK_ID": "hifi.tags.track_id",
|
|
|
|
|
"HIFI_ARTIST_ID": "hifi.tags.artist_id",
|
|
|
|
|
"QOBUZ_TRACK_ID": "qobuz.tags.track_id",
|
|
|
|
|
"QOBUZ_ARTIST_ID": "qobuz.tags.artist_id",
|
|
|
|
|
"GENIUS_TRACK_ID": "genius.tags.track_id",
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
DEFAULT_SOURCE_ORDER = ["musicbrainz", "deezer", "audiodb", "tidal", "qobuz", "lastfm", "genius"]
|
|
|
|
|
DEFAULT_SOURCE_ORDER = ["musicbrainz", "deezer", "audiodb", "tidal", "hifi", "qobuz", "lastfm", "genius"]
|
|
|
|
|
|
|
|
|
|
ID3_TAG_MAP = {
|
|
|
|
|
"MUSICBRAINZ_RECORDING_ID": ("UFID", "http://musicbrainz.org"),
|
|
|
|
|
@ -452,6 +454,9 @@ def _process_tidal_source(pp: dict, metadata: dict, cfg, runtime, track_title: s
|
|
|
|
|
td_details = _call_source_lookup("Tidal track details", tidal_client.get_track, str(td_track_id))
|
|
|
|
|
if td_details:
|
|
|
|
|
pp["tidal_isrc"] = td_details.get("isrc")
|
|
|
|
|
td_bpm = td_details.get("bpm")
|
|
|
|
|
if td_bpm and td_bpm > 0:
|
|
|
|
|
pp["tidal_bpm"] = td_bpm
|
|
|
|
|
td_copyright = td_details.get("copyright")
|
|
|
|
|
if isinstance(td_copyright, dict):
|
|
|
|
|
td_copyright = td_copyright.get("text", td_copyright.get("name", ""))
|
|
|
|
|
@ -465,6 +470,47 @@ def _process_tidal_source(pp: dict, metadata: dict, cfg, runtime, track_title: s
|
|
|
|
|
pp["release_year"] = td_release[:4]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _process_hifi_source(pp: dict, metadata: dict, cfg, runtime, track_title: str, artist_name: str) -> None:
|
|
|
|
|
if cfg.get("hifi.embed_tags", True) is False:
|
|
|
|
|
return
|
|
|
|
|
if not track_title or not artist_name:
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
hifi_client = getattr(runtime, "hifi_client", None)
|
|
|
|
|
if not hifi_client:
|
|
|
|
|
return
|
|
|
|
|
hifi_results = _call_source_lookup("HiFi track", hifi_client.search_tracks, track_title, artist_name)
|
|
|
|
|
if hifi_results and len(hifi_results) > 0:
|
|
|
|
|
hifi_track = hifi_results[0]
|
|
|
|
|
if _names_match(hifi_track.get("title", ""), track_title):
|
|
|
|
|
hifi_track_id = hifi_track.get("id")
|
|
|
|
|
if hifi_track_id:
|
|
|
|
|
pp["id_tags"]["HIFI_TRACK_ID"] = str(hifi_track_id)
|
|
|
|
|
hifi_artist_id = hifi_track.get("artist_id")
|
|
|
|
|
if hifi_artist_id:
|
|
|
|
|
pp["id_tags"]["HIFI_ARTIST_ID"] = str(hifi_artist_id)
|
|
|
|
|
if hifi_track_id:
|
|
|
|
|
hifi_details = _call_source_lookup("HiFi track details", hifi_client.get_track_info, hifi_track_id)
|
|
|
|
|
if hifi_details:
|
|
|
|
|
hifi_isrc = hifi_details.get("isrc")
|
|
|
|
|
if hifi_isrc:
|
|
|
|
|
pp["hifi_isrc"] = hifi_isrc
|
|
|
|
|
hifi_bpm = hifi_details.get("bpm")
|
|
|
|
|
if hifi_bpm and hifi_bpm > 0:
|
|
|
|
|
pp["hifi_bpm"] = hifi_bpm
|
|
|
|
|
hifi_copyright = hifi_details.get("copyright")
|
|
|
|
|
if hifi_copyright:
|
|
|
|
|
pp["hifi_copyright"] = hifi_copyright
|
|
|
|
|
if not pp["release_year"]:
|
|
|
|
|
hifi_album_id = hifi_track.get("album_id")
|
|
|
|
|
if hifi_album_id:
|
|
|
|
|
hifi_album = _call_source_lookup("HiFi album", hifi_client.get_album, hifi_album_id)
|
|
|
|
|
if hifi_album:
|
|
|
|
|
hifi_release = str(hifi_album.get("release_date", "") or "")
|
|
|
|
|
if len(hifi_release) >= 4 and hifi_release[:4].isdigit():
|
|
|
|
|
pp["release_year"] = hifi_release[:4]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _process_qobuz_source(pp: dict, metadata: dict, cfg, runtime, track_title: str, artist_name: str) -> None:
|
|
|
|
|
if cfg.get("qobuz.embed_tags", True) is False:
|
|
|
|
|
return
|
|
|
|
|
@ -572,6 +618,8 @@ def _process_source_enrichment(source_name: str, pp: dict, metadata: dict, cfg,
|
|
|
|
|
_process_audiodb_source(pp, metadata, cfg, runtime, track_title, artist_name)
|
|
|
|
|
elif source_name == "tidal":
|
|
|
|
|
_process_tidal_source(pp, metadata, cfg, runtime, track_title, artist_name)
|
|
|
|
|
elif source_name == "hifi":
|
|
|
|
|
_process_hifi_source(pp, metadata, cfg, runtime, track_title, artist_name)
|
|
|
|
|
elif source_name == "qobuz":
|
|
|
|
|
_process_qobuz_source(pp, metadata, cfg, runtime, track_title, artist_name)
|
|
|
|
|
elif source_name == "lastfm":
|
|
|
|
|
@ -634,15 +682,23 @@ def _write_embedded_metadata(audio_file, metadata: dict, pp: dict, cfg, symbols)
|
|
|
|
|
audio_file["\xa9day"] = [release_year]
|
|
|
|
|
logger.info("Date tag: %s", release_year)
|
|
|
|
|
|
|
|
|
|
if _tag_enabled(cfg, "deezer.tags.bpm") and pp["deezer_bpm"] and pp["deezer_bpm"] > 0:
|
|
|
|
|
bpm_int = int(pp["deezer_bpm"])
|
|
|
|
|
bpm_candidates = []
|
|
|
|
|
if pp["deezer_bpm"] and pp["deezer_bpm"] > 0 and _tag_enabled(cfg, "deezer.tags.bpm"):
|
|
|
|
|
bpm_candidates.append(("Deezer", pp["deezer_bpm"]))
|
|
|
|
|
if pp["tidal_bpm"] and pp["tidal_bpm"] > 0 and _tag_enabled(cfg, "tidal.tags.bpm"):
|
|
|
|
|
bpm_candidates.append(("Tidal", pp["tidal_bpm"]))
|
|
|
|
|
if pp["hifi_bpm"] and pp["hifi_bpm"] > 0 and _tag_enabled(cfg, "hifi.tags.bpm"):
|
|
|
|
|
bpm_candidates.append(("HiFi", pp["hifi_bpm"]))
|
|
|
|
|
if bpm_candidates:
|
|
|
|
|
bpm_source, bpm_val = bpm_candidates[0]
|
|
|
|
|
bpm_int = int(bpm_val)
|
|
|
|
|
if isinstance(audio_file.tags, symbols.ID3):
|
|
|
|
|
audio_file.tags.add(symbols.TBPM(encoding=3, text=[str(bpm_int)]))
|
|
|
|
|
elif is_vorbis_like(audio_file, symbols):
|
|
|
|
|
audio_file["BPM"] = [str(bpm_int)]
|
|
|
|
|
elif isinstance(audio_file, symbols.MP4):
|
|
|
|
|
audio_file["tmpo"] = [bpm_int]
|
|
|
|
|
logger.info("BPM: %s", bpm_int)
|
|
|
|
|
logger.info("BPM (%s): %s", bpm_source, bpm_int)
|
|
|
|
|
|
|
|
|
|
if _tag_enabled(cfg, "audiodb.tags.mood") and pp["audiodb_mood"]:
|
|
|
|
|
if isinstance(audio_file.tags, symbols.ID3):
|
|
|
|
|
@ -699,6 +755,8 @@ def _write_embedded_metadata(audio_file, metadata: dict, pp: dict, cfg, symbols)
|
|
|
|
|
isrc_candidates.append(("Deezer", pp["deezer_isrc"]))
|
|
|
|
|
if pp["tidal_isrc"] and _tag_enabled(cfg, "tidal.tags.isrc"):
|
|
|
|
|
isrc_candidates.append(("Tidal", pp["tidal_isrc"]))
|
|
|
|
|
if pp["hifi_isrc"] and _tag_enabled(cfg, "hifi.tags.isrc"):
|
|
|
|
|
isrc_candidates.append(("HiFi", pp["hifi_isrc"]))
|
|
|
|
|
if pp["qobuz_isrc"] and _tag_enabled(cfg, "qobuz.tags.isrc"):
|
|
|
|
|
isrc_candidates.append(("Qobuz", pp["qobuz_isrc"]))
|
|
|
|
|
if isrc_candidates:
|
|
|
|
|
@ -716,6 +774,8 @@ def _write_embedded_metadata(audio_file, metadata: dict, pp: dict, cfg, symbols)
|
|
|
|
|
copyright_candidates.append(("Tidal", pp["tidal_copyright"]))
|
|
|
|
|
if pp["qobuz_copyright"] and _tag_enabled(cfg, "qobuz.tags.copyright"):
|
|
|
|
|
copyright_candidates.append(("Qobuz", pp["qobuz_copyright"]))
|
|
|
|
|
if pp["hifi_copyright"] and _tag_enabled(cfg, "hifi.tags.copyright"):
|
|
|
|
|
copyright_candidates.append(("HiFi", pp["hifi_copyright"]))
|
|
|
|
|
if copyright_candidates:
|
|
|
|
|
copyright_source, final_copyright = copyright_candidates[0]
|
|
|
|
|
if isinstance(audio_file.tags, symbols.ID3):
|
|
|
|
|
@ -963,11 +1023,15 @@ def embed_source_ids(audio_file, metadata: dict, context: dict = None, runtime=N
|
|
|
|
|
"isrc": None,
|
|
|
|
|
"deezer_bpm": None,
|
|
|
|
|
"deezer_isrc": None,
|
|
|
|
|
"tidal_bpm": None,
|
|
|
|
|
"hifi_bpm": None,
|
|
|
|
|
"hifi_copyright": None,
|
|
|
|
|
"audiodb_mood": None,
|
|
|
|
|
"audiodb_style": None,
|
|
|
|
|
"audiodb_genre": None,
|
|
|
|
|
"tidal_isrc": None,
|
|
|
|
|
"tidal_copyright": None,
|
|
|
|
|
"hifi_isrc": None,
|
|
|
|
|
"qobuz_isrc": None,
|
|
|
|
|
"qobuz_copyright": None,
|
|
|
|
|
"qobuz_label": None,
|
|
|
|
|
@ -981,12 +1045,46 @@ def embed_source_ids(audio_file, metadata: dict, context: dict = None, runtime=N
|
|
|
|
|
if not isinstance(source_order, list) or not source_order:
|
|
|
|
|
source_order = DEFAULT_SOURCE_ORDER
|
|
|
|
|
|
|
|
|
|
# If this download came from HiFi, use cached metadata from the download
|
|
|
|
|
# pipeline instead of re-searching the HiFi API.
|
|
|
|
|
original_search = get_import_original_search(context)
|
|
|
|
|
cached_meta = original_search.get("_source_metadata") or {}
|
|
|
|
|
if cached_meta.get("source") == "hifi":
|
|
|
|
|
if _tag_enabled(cfg, "hifi.embed_tags"):
|
|
|
|
|
if cfg.get("hifi.tags.track_id", True) and cached_meta.get("track_id"):
|
|
|
|
|
pp["id_tags"]["HIFI_TRACK_ID"] = str(cached_meta["track_id"])
|
|
|
|
|
if cfg.get("hifi.tags.artist_id", True) and cached_meta.get("artist_id"):
|
|
|
|
|
pp["id_tags"]["HIFI_ARTIST_ID"] = str(cached_meta["artist_id"])
|
|
|
|
|
if cfg.get("hifi.tags.isrc", True) and cached_meta.get("isrc"):
|
|
|
|
|
pp["hifi_isrc"] = cached_meta["isrc"]
|
|
|
|
|
if cfg.get("hifi.tags.bpm", True) and cached_meta.get("bpm"):
|
|
|
|
|
pp["hifi_bpm"] = cached_meta["bpm"]
|
|
|
|
|
if cfg.get("hifi.tags.copyright", True) and cached_meta.get("copyright"):
|
|
|
|
|
pp["hifi_copyright"] = cached_meta["copyright"]
|
|
|
|
|
source_order = [s for s in source_order if s != "hifi"]
|
|
|
|
|
|
|
|
|
|
# If this download came from Tidal, use cached metadata from the download
|
|
|
|
|
# pipeline instead of re-searching the Tidal API.
|
|
|
|
|
if cached_meta.get("source") == "tidal":
|
|
|
|
|
if _tag_enabled(cfg, "tidal.embed_tags"):
|
|
|
|
|
if cfg.get("tidal.tags.track_id", True) and cached_meta.get("track_id"):
|
|
|
|
|
pp["id_tags"]["TIDAL_TRACK_ID"] = str(cached_meta["track_id"])
|
|
|
|
|
if cfg.get("tidal.tags.artist_id", True) and cached_meta.get("artist_id"):
|
|
|
|
|
pp["id_tags"]["TIDAL_ARTIST_ID"] = str(cached_meta["artist_id"])
|
|
|
|
|
if cfg.get("tidal.tags.isrc", True) and cached_meta.get("isrc"):
|
|
|
|
|
pp["tidal_isrc"] = cached_meta["isrc"]
|
|
|
|
|
if cfg.get("tidal.tags.bpm", True) and cached_meta.get("bpm"):
|
|
|
|
|
pp["tidal_bpm"] = cached_meta["bpm"]
|
|
|
|
|
if cfg.get("tidal.tags.copyright", True) and cached_meta.get("copyright"):
|
|
|
|
|
pp["tidal_copyright"] = cached_meta["copyright"]
|
|
|
|
|
source_order = [s for s in source_order if s != "tidal"]
|
|
|
|
|
|
|
|
|
|
db = get_database()
|
|
|
|
|
|
|
|
|
|
for source_name in source_order:
|
|
|
|
|
_process_source_enrichment(source_name, pp, metadata, cfg, runtime, track_title, artist_name)
|
|
|
|
|
|
|
|
|
|
if not pp["id_tags"] and not pp["deezer_bpm"] and not pp["deezer_isrc"] and not pp["audiodb_mood"] and not pp["audiodb_style"]:
|
|
|
|
|
if not pp["id_tags"] and not pp["deezer_bpm"] and not pp["deezer_isrc"] and not pp["tidal_bpm"] and not pp["hifi_bpm"] and not pp["hifi_copyright"] and not pp["audiodb_mood"] and not pp["audiodb_style"]:
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
release_year = _write_embedded_metadata(audio_file, metadata, pp, cfg, symbols)
|
|
|
|
|
|