You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
SoulSync/core/imports/context.py

430 lines
17 KiB

"""Helpers for normalizing and reading import contexts.
These functions keep the single-import pipeline source-agnostic while still
accepting legacy `spotify_*` payloads from older callers.
"""
from __future__ import annotations
from typing import Any, Dict, Optional
def _as_dict(value: Any) -> Dict[str, Any]:
return value if isinstance(value, dict) else {}
def _first_value(mapping: Dict[str, Any], *keys: str, default: Any = "") -> Any:
for key in keys:
if key in mapping:
value = mapping.get(key)
if value not in (None, ""):
return value
return default
def _first_id_value(*values: Any) -> str:
for value in values:
if value in (None, ""):
continue
text = str(value).strip()
if text:
return text
return ""
def _first_source_aware_id(source: str, *values: Any) -> str:
source_name = (source or "").strip().lower()
for value in values:
if value in (None, ""):
continue
text = str(value).strip()
if not text:
continue
if source_name.startswith("spotify") and text.isdigit():
continue
return text
return ""
def extract_artist_name(artist: Any) -> str:
if isinstance(artist, dict):
return str(artist.get("name", "") or "")
if hasattr(artist, "name"):
return str(artist.name or "")
return str(artist) if artist else ""
def normalize_import_context(context: Optional[Dict[str, Any]]) -> Dict[str, Any]:
"""Normalize an import context to neutral fields in place and drop legacy aliases."""
if not isinstance(context, dict):
return {}
source = context.get("source") or context.get("_source") or ""
artist = _as_dict(context.get("artist") or context.get("spotify_artist"))
album = _as_dict(context.get("album") or context.get("spotify_album"))
track_info = _as_dict(context.get("track_info"))
original_search = _as_dict(context.get("original_search_result"))
search_result = _as_dict(context.get("search_result"))
normalized_search = original_search or search_result
if source:
context["source"] = source
context["artist"] = artist
context["album"] = album
context["track_info"] = track_info
context["original_search_result"] = normalized_search
context.pop("_source", None)
context.pop("spotify_artist", None)
context.pop("spotify_album", None)
for clean_key, legacy_key in (
("clean_title", "spotify_clean_title"),
("clean_album", "spotify_clean_album"),
("clean_artist", "spotify_clean_artist"),
):
if clean_key not in normalized_search or normalized_search.get(clean_key) in (None, ""):
legacy_value = normalized_search.get(legacy_key)
if legacy_value not in (None, ""):
normalized_search[clean_key] = legacy_value
normalized_search.pop(legacy_key, None)
has_clean = bool(context.get("has_clean_metadata", context.get("has_clean_spotify_data", False)))
has_full = bool(context.get("has_full_metadata", context.get("has_full_spotify_metadata", False)))
context["has_clean_metadata"] = has_clean
context["has_full_metadata"] = has_full
context.pop("has_clean_spotify_data", None)
context.pop("has_full_spotify_metadata", None)
return context
def get_import_context_artist(context: Optional[Dict[str, Any]]) -> Dict[str, Any]:
if not isinstance(context, dict):
return {}
return _as_dict(context.get("artist") or context.get("spotify_artist"))
def get_import_context_album(context: Optional[Dict[str, Any]]) -> Dict[str, Any]:
if not isinstance(context, dict):
return {}
return _as_dict(context.get("album") or context.get("spotify_album"))
def get_import_track_info(context: Optional[Dict[str, Any]]) -> Dict[str, Any]:
if not isinstance(context, dict):
return {}
return _as_dict(context.get("track_info"))
def get_import_original_search(context: Optional[Dict[str, Any]]) -> Dict[str, Any]:
if not isinstance(context, dict):
return {}
return _as_dict(context.get("original_search_result"))
def get_import_search_result(context: Optional[Dict[str, Any]]) -> Dict[str, Any]:
if not isinstance(context, dict):
return {}
return _as_dict(context.get("search_result"))
def get_import_source(context: Optional[Dict[str, Any]]) -> str:
if not isinstance(context, dict):
return ""
source = context.get("source")
if source:
return str(source)
track_info = get_import_track_info(context)
source = _first_value(track_info, "source", default="")
if source:
return str(source)
original_search = get_import_original_search(context)
source = _first_value(original_search, "source", default="")
if source:
return str(source)
album = get_import_context_album(context)
source = _first_value(album, "source", default="")
if source:
return str(source)
artist = get_import_context_artist(context)
source = _first_value(artist, "source", default="")
return str(source) if source else ""
def get_import_clean_title(
context: Optional[Dict[str, Any]],
album_info: Optional[Dict[str, Any]] = None,
default: str = "Unknown Track",
) -> str:
original_search = get_import_original_search(context)
title = _first_value(
original_search,
"clean_title",
"title",
default="",
)
if not title and album_info:
title = _first_value(album_info, "clean_track_name", "track_name", default="")
if not title:
track_info = get_import_track_info(context)
title = _first_value(track_info, "name", "title", default="")
return str(title or default)
def get_import_clean_album(
context: Optional[Dict[str, Any]],
album_info: Optional[Dict[str, Any]] = None,
default: str = "Unknown Album",
) -> str:
original_search = get_import_original_search(context)
album = _first_value(
original_search,
"clean_album",
"album",
default="",
)
if not album and album_info:
album = _first_value(album_info, "album_name", "clean_album_name", default="")
if not album:
album_ctx = get_import_context_album(context)
album = _first_value(album_ctx, "name", default="")
return str(album or default)
def get_import_clean_artist(context: Optional[Dict[str, Any]], default: str = "Unknown Artist") -> str:
original_search = get_import_original_search(context)
artist = _first_value(
original_search,
"clean_artist",
"artist",
default="",
)
if not artist:
artist_ctx = get_import_context_artist(context)
artist = _first_value(artist_ctx, "name", default="")
return str(artist or default)
def get_import_has_clean_metadata(context: Optional[Dict[str, Any]]) -> bool:
if not isinstance(context, dict):
return False
return bool(context.get("has_clean_metadata", False))
def get_import_has_full_metadata(context: Optional[Dict[str, Any]]) -> bool:
if not isinstance(context, dict):
return False
return bool(context.get("has_full_metadata", False))
def get_import_source_ids(context: Optional[Dict[str, Any]]) -> Dict[str, str]:
source = get_import_source(context)
track_info = get_import_track_info(context)
original_search = get_import_original_search(context)
search_result = get_import_search_result(context)
artist = get_import_context_artist(context)
album = get_import_context_album(context)
return {
"track_id": _first_source_aware_id(
source,
_first_value(track_info, "id", "track_id", "trackId", "source_track_id", default=""),
_first_value(track_info, "spotify_track_id", "itunes_track_id", "deezer_id", "deezer_track_id", "discogs_id", "soul_id", default=""),
_first_value(original_search, "id", "track_id", "source_track_id", default=""),
_first_value(original_search, "spotify_track_id", "itunes_track_id", "deezer_id", "deezer_track_id", "discogs_id", "soul_id", default=""),
_first_value(search_result, "id", "track_id", "source_track_id", default=""),
_first_value(search_result, "spotify_track_id", "itunes_track_id", "deezer_id", "deezer_track_id", "discogs_id", "soul_id", default=""),
),
"artist_id": _first_source_aware_id(
source,
_first_value(artist, "id", "artist_id", "source_artist_id", default=""),
_first_value(artist, "spotify_artist_id", "itunes_artist_id", "deezer_id", "deezer_artist_id", "discogs_id", "soul_id", default=""),
_first_value(original_search, "artist_id", "source_artist_id", default=""),
_first_value(original_search, "spotify_artist_id", "itunes_artist_id", "deezer_id", "deezer_artist_id", "discogs_id", "soul_id", default=""),
_first_value(search_result, "artist_id", "source_artist_id", default=""),
_first_value(search_result, "spotify_artist_id", "itunes_artist_id", "deezer_id", "deezer_artist_id", "discogs_id", "soul_id", default=""),
),
"album_id": _first_source_aware_id(
source,
_first_value(album, "id", "album_id", "collectionId", "source_album_id", default=""),
_first_value(album, "spotify_album_id", "itunes_album_id", "deezer_id", "deezer_album_id", "discogs_id", "soul_id", "album_soul_id", "hydrabase_album_id", default=""),
_first_value(original_search, "album_id", "source_album_id", default=""),
_first_value(original_search, "spotify_album_id", "itunes_album_id", "deezer_id", "deezer_album_id", "discogs_id", "soul_id", "album_soul_id", "hydrabase_album_id", default=""),
_first_value(track_info, "album_id", "source_album_id", default=""),
_first_value(track_info, "spotify_album_id", "itunes_album_id", "deezer_id", "deezer_album_id", "discogs_id", "soul_id", "album_soul_id", "hydrabase_album_id", default=""),
_first_value(search_result, "album_id", "source_album_id", default=""),
_first_value(search_result, "spotify_album_id", "itunes_album_id", "deezer_id", "deezer_album_id", "discogs_id", "soul_id", "album_soul_id", "hydrabase_album_id", default=""),
),
}
def get_source_tag_names(source: str) -> Dict[str, Optional[str]]:
source_name = (source or "").strip().lower()
if source_name == "spotify":
return {"track": "SPOTIFY_TRACK_ID", "artist": "SPOTIFY_ARTIST_ID", "album": "SPOTIFY_ALBUM_ID"}
if source_name == "itunes":
return {"track": "ITUNES_TRACK_ID", "artist": "ITUNES_ARTIST_ID", "album": "ITUNES_ALBUM_ID"}
if source_name == "deezer":
return {"track": "DEEZER_TRACK_ID", "artist": "DEEZER_ARTIST_ID", "album": None}
if source_name == "hydrabase":
return {"track": None, "artist": None, "album": None}
if source_name == "discogs":
return {"track": None, "artist": None, "album": None}
if source_name == "hifi":
return {"track": "HIFI_TRACK_ID", "artist": "HIFI_ARTIST_ID", "album": None}
return {"track": None, "artist": None, "album": None}
def get_library_source_id_columns(source: str) -> Dict[str, Optional[str]]:
source_name = (source or "").strip().lower()
if source_name == "spotify":
return {"artist": "spotify_artist_id", "album": "spotify_album_id", "track": "spotify_track_id"}
if source_name == "itunes":
return {"artist": "itunes_artist_id", "album": "itunes_album_id", "track": "itunes_track_id"}
if source_name == "deezer":
return {"artist": "deezer_id", "album": "deezer_id", "track": "deezer_id"}
if source_name == "hydrabase":
return {"artist": "soul_id", "album": "soul_id", "track": "soul_id", "track_album": "album_soul_id"}
if source_name == "discogs":
return {"artist": "discogs_id", "album": "discogs_id", "track": None}
if source_name == "hifi":
return {"artist": "hifi_artist_id", "album": None, "track": "hifi_track_id"}
return {}
def build_import_album_info(
context: Optional[Dict[str, Any]],
*,
album_info: Optional[Dict[str, Any]] = None,
force_album: bool = False,
) -> Dict[str, Any]:
"""Build the album-info payload used by post-processing."""
album_ctx = get_import_context_album(context)
track_info = get_import_track_info(context)
original_search = get_import_original_search(context)
artist_ctx = get_import_context_artist(context)
track_number = (
(album_info or {}).get("track_number")
or track_info.get("track_number")
or original_search.get("track_number")
or 1
)
disc_number = (
(album_info or {}).get("disc_number")
or track_info.get("disc_number")
or original_search.get("disc_number")
or 1
)
clean_track_name = get_import_clean_title(context, album_info=album_info, default=original_search.get("title", "Unknown Track"))
album_name = get_import_clean_album(context, album_info=album_info, default=original_search.get("album", "Unknown Album"))
album_image_url = (
(album_info or {}).get("album_image_url")
or album_ctx.get("image_url")
or ""
)
total_tracks = (
album_ctx.get("total_tracks")
or track_info.get("total_tracks")
or (album_info or {}).get("total_tracks")
or 0
)
album_type = (album_ctx.get("album_type") or track_info.get("album_type") or "album")
source = get_import_source(context)
artist_name = artist_ctx.get("name") or original_search.get("artist") or get_import_clean_artist(context)
normalized_album = str(album_name or "").strip().lower()
normalized_title = str(clean_track_name or "").strip().lower()
normalized_artist = str(artist_name or "").strip().lower()
# Route through album_path when the metadata source has explicitly
# identified the release type (single / EP / compilation). The
# ``total_tracks > 1`` heuristic below catches normal multi-track
# albums even without explicit type info, but it can't catch
# singles (1 track, album name often equal to title) so they
# used to fall through to single_path — which doesn't honour the
# ``$albumtype`` template variable. Result: users with a
# ``${albumtype}s/...`` template saw an "Albums" folder and never
# any "Singles" or "EPs" folder. ``"album"`` is excluded from this
# check because it's the default fallback when album_type is
# missing — only treat values that came from a real source as
# explicit.
explicit_release_type = (album_type or "").strip().lower() in ("single", "ep", "compilation")
is_album = bool(
force_album
or explicit_release_type
or (
normalized_album
and total_tracks
and int(total_tracks) > 1
and normalized_album != normalized_title
and normalized_album != normalized_artist
)
)
return {
"is_album": is_album,
"album_name": album_name,
"track_number": int(track_number) if str(track_number).isdigit() else track_number,
"disc_number": int(disc_number) if str(disc_number).isdigit() else disc_number,
"clean_track_name": clean_track_name,
"album_image_url": album_image_url,
"confidence": (album_info or {}).get("confidence", 1.0 if is_album or force_album else 0.0),
"source": source,
"album_type": album_type,
"total_tracks": int(total_tracks) if str(total_tracks).isdigit() else total_tracks,
}
def detect_album_info_web(context, artist_context=None):
"""Best-effort album detection for single-track downloads."""
context = normalize_import_context(context)
if artist_context is None:
artist_context = context.get("artist") or {}
album_info = build_import_album_info(context)
if album_info.get("is_album"):
return album_info
album_ctx = get_import_context_album(context)
track_info = get_import_track_info(context)
original_search = get_import_original_search(context)
album_name = (
album_ctx.get("name")
or track_info.get("album")
or original_search.get("album")
or ""
)
track_name = (
track_info.get("name")
or original_search.get("title")
or ""
)
artist_name = extract_artist_name(artist_context) or get_import_clean_artist(context, default="")
if album_name and track_name and album_name.strip().lower() not in {
track_name.strip().lower(),
artist_name.strip().lower(),
}:
return build_import_album_info(
context,
album_info={
"album_name": album_name,
"track_number": track_info.get("track_number", 1),
"disc_number": track_info.get("disc_number", 1),
"album_image_url": album_ctx.get("image_url", ""),
"confidence": 0.5,
},
force_album=True,
)
return None