mirror of https://github.com/Nezreka/SoulSync.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
606 lines
22 KiB
606 lines
22 KiB
"""Import an existing library file into a missing album slot.
|
|
|
|
This module keeps the "I Have This" behavior out of the Flask route layer:
|
|
copy the selected source file, post-process it with target album metadata,
|
|
inherit album identity tags from target siblings, and write the real DB row.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
import logging
|
|
import os
|
|
import shutil
|
|
import uuid
|
|
from typing import Any, Callable, Dict, Optional
|
|
|
|
from core.library_reorganize import _build_post_process_context
|
|
|
|
|
|
logger = logging.getLogger("soulsync.library.missing_track_import")
|
|
|
|
|
|
class MissingTrackImportError(Exception):
|
|
"""Expected import failure that should be surfaced to the API caller."""
|
|
|
|
def __init__(self, message: str, status_code: int = 400):
|
|
super().__init__(message)
|
|
self.status_code = status_code
|
|
|
|
|
|
@dataclass
|
|
class MissingTrackImportDeps:
|
|
database: Any
|
|
config_manager: Any
|
|
post_process_fn: Callable[[str, dict, str], Any]
|
|
resolve_library_file_path_fn: Callable[[Optional[str]], Optional[str]]
|
|
docker_resolve_path_fn: Callable[[str], str]
|
|
sync_tracks_to_server_fn: Optional[Callable[[list, str], Any]] = None
|
|
service_id_columns: Optional[Dict[str, Dict[str, str]]] = None
|
|
|
|
|
|
_ALBUM_IDENTITY_TAGS = {
|
|
"album",
|
|
"albumartist",
|
|
"album_artist",
|
|
"date",
|
|
"year",
|
|
"tracktotal",
|
|
"totaltracks",
|
|
"totaldiscs",
|
|
"musicbrainz_albumid",
|
|
"musicbrainz_albumartistid",
|
|
"musicbrainz_releasegroupid",
|
|
"barcode",
|
|
"catalognumber",
|
|
"originaldate",
|
|
"releasecountry",
|
|
"releasestatus",
|
|
"releasetype",
|
|
"media",
|
|
"script",
|
|
"copyright",
|
|
"spotify_album_id",
|
|
"deezer_album_id",
|
|
"tidal_album_id",
|
|
"qobuz_album_id",
|
|
"itunes_album_id",
|
|
"audiodb_album_id",
|
|
}
|
|
|
|
_ID3_STANDARD_TAGS = {
|
|
"album": "TALB",
|
|
"albumartist": "TPE2",
|
|
"album_artist": "TPE2",
|
|
"date": "TDRC",
|
|
"year": "TDRC",
|
|
}
|
|
|
|
_ID3_TXXX_DESCS = {
|
|
"musicbrainz_albumid": "MusicBrainz Album Id",
|
|
"musicbrainz_albumartistid": "MusicBrainz Album Artist Id",
|
|
"musicbrainz_releasegroupid": "MusicBrainz Release Group Id",
|
|
"barcode": "BARCODE",
|
|
"catalognumber": "CATALOGNUMBER",
|
|
"originaldate": "ORIGINALDATE",
|
|
"releasecountry": "RELEASECOUNTRY",
|
|
"releasestatus": "RELEASESTATUS",
|
|
"releasetype": "RELEASETYPE",
|
|
"media": "MEDIA",
|
|
"script": "SCRIPT",
|
|
"totaldiscs": "TOTALDISCS",
|
|
"tracktotal": "TOTALTRACKS",
|
|
"totaltracks": "TOTALTRACKS",
|
|
"spotify_album_id": "Spotify Album Id",
|
|
"deezer_album_id": "Deezer Album Id",
|
|
"tidal_album_id": "Tidal Album Id",
|
|
"qobuz_album_id": "Qobuz Album Id",
|
|
"itunes_album_id": "iTunes Album Id",
|
|
"audiodb_album_id": "AudioDB Album Id",
|
|
}
|
|
|
|
_MP4_STANDARD_TAGS = {
|
|
"album": "\xa9alb",
|
|
"albumartist": "aART",
|
|
"album_artist": "aART",
|
|
"date": "\xa9day",
|
|
"year": "\xa9day",
|
|
}
|
|
|
|
|
|
def import_existing_track_for_album_slot(album_id: str, payload: dict, deps: MissingTrackImportDeps) -> dict:
|
|
source_track_id = payload.get("source_track_id") or payload.get("linked_track_id")
|
|
expected = payload.get("expected_track") or {}
|
|
if not source_track_id:
|
|
raise MissingTrackImportError("source_track_id is required", 400)
|
|
if not expected.get("track_number") or not (expected.get("title") or expected.get("name")):
|
|
raise MissingTrackImportError("expected_track with title and track_number is required", 400)
|
|
|
|
database = deps.database
|
|
album_data, source_track = _load_album_and_source_track(database, album_id, source_track_id)
|
|
if album_data.get("server_source") and source_track.get("server_source") and album_data["server_source"] != source_track["server_source"]:
|
|
raise MissingTrackImportError("Selected track belongs to a different library source", 400)
|
|
|
|
source_path = deps.resolve_library_file_path_fn(source_track.get("file_path"))
|
|
if not source_path:
|
|
raise MissingTrackImportError(_file_not_found_message(source_track.get("file_path")), 404)
|
|
|
|
staging_path = _copy_source_to_staging(source_path, album_id, expected, deps)
|
|
metadata_source = (expected.get("source") or payload.get("source") or "").strip().lower() or "library"
|
|
expected_title = expected.get("title") or expected.get("name") or "Unknown Track"
|
|
expected_track_id = _expected_track_id(expected)
|
|
album_source_id = _album_source_id(payload, expected, album_data, album_id)
|
|
|
|
api_track = _build_api_track(expected, expected_title, expected_track_id, album_source_id, metadata_source, album_data)
|
|
context = _build_context(payload, album_data, source_path, source_track_id, api_track, album_source_id, metadata_source)
|
|
|
|
context_key = f"existing_import_{album_id}_{api_track['disc_number']}_{api_track['track_number']}_{uuid.uuid4().hex[:8]}"
|
|
deps.post_process_fn(context_key, context, staging_path)
|
|
final_path = context.get("_final_processed_path")
|
|
if not final_path or not os.path.exists(final_path):
|
|
raise MissingTrackImportError("Post-processing did not produce a final file", 500)
|
|
|
|
ensure_tracks_disc_number_column(database)
|
|
copy_album_identity_from_target_sibling(
|
|
database,
|
|
album_id,
|
|
final_path,
|
|
api_track["disc_number"],
|
|
api_track["track_number"],
|
|
deps.resolve_library_file_path_fn,
|
|
)
|
|
|
|
target_track_id = _upsert_target_track(
|
|
database,
|
|
deps,
|
|
album_id,
|
|
album_data,
|
|
source_track,
|
|
final_path,
|
|
expected_title,
|
|
expected_track_id,
|
|
metadata_source,
|
|
api_track,
|
|
)
|
|
_sync_imported_track(deps, target_track_id, expected_title, album_data)
|
|
|
|
return {
|
|
"track_id": target_track_id,
|
|
"final_path": final_path,
|
|
"artist_id": album_data.get("target_artist_id"),
|
|
}
|
|
|
|
|
|
def _load_album_and_source_track(database, album_id: str, source_track_id: str) -> tuple[dict, dict]:
|
|
with database._get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"""
|
|
SELECT al.*, ar.name AS artist_name, ar.id AS target_artist_id
|
|
FROM albums al
|
|
JOIN artists ar ON ar.id = al.artist_id
|
|
WHERE al.id = ?
|
|
""",
|
|
(album_id,),
|
|
)
|
|
album_row = cursor.fetchone()
|
|
if not album_row:
|
|
raise MissingTrackImportError("Album not found", 404)
|
|
|
|
cursor.execute("SELECT * FROM tracks WHERE id = ?", (source_track_id,))
|
|
source_row = cursor.fetchone()
|
|
if not source_row:
|
|
raise MissingTrackImportError("Selected library track not found", 404)
|
|
|
|
return dict(album_row), dict(source_row)
|
|
|
|
|
|
def _copy_source_to_staging(source_path: str, album_id: str, expected: dict, deps: MissingTrackImportDeps) -> str:
|
|
download_dir = deps.docker_resolve_path_fn(deps.config_manager.get("soulseek.download_path", "./downloads"))
|
|
staging_root = os.path.join(download_dir, "ssync_existing_import")
|
|
os.makedirs(staging_root, exist_ok=True)
|
|
source_ext = os.path.splitext(source_path)[1] or ".audio"
|
|
staging_name = (
|
|
f"existing_{album_id}_{expected.get('disc_number') or 1}_"
|
|
f"{expected.get('track_number')}_{uuid.uuid4().hex[:8]}{source_ext}"
|
|
)
|
|
staging_path = os.path.join(staging_root, staging_name)
|
|
shutil.copy2(source_path, staging_path)
|
|
return staging_path
|
|
|
|
|
|
def _build_api_track(
|
|
expected: dict,
|
|
expected_title: str,
|
|
expected_track_id: str,
|
|
album_source_id: str,
|
|
metadata_source: str,
|
|
album_data: dict,
|
|
) -> dict:
|
|
return {
|
|
"id": expected_track_id,
|
|
"track_id": expected_track_id,
|
|
"name": expected_title,
|
|
"title": expected_title,
|
|
"track_number": int(expected.get("track_number") or 1),
|
|
"disc_number": int(expected.get("disc_number") or 1),
|
|
"duration_ms": int(expected.get("duration") or expected.get("duration_ms") or 0),
|
|
"artists": expected.get("artists") or [album_data.get("artist_name") or ""],
|
|
"source": metadata_source,
|
|
"album_id": album_source_id,
|
|
"spotify_track_id": expected.get("spotify_track_id") or "",
|
|
"deezer_id": expected.get("deezer_id") or "",
|
|
"itunes_track_id": expected.get("itunes_track_id") or "",
|
|
"musicbrainz_recording_id": expected.get("musicbrainz_recording_id") or "",
|
|
}
|
|
|
|
|
|
def _build_context(
|
|
payload: dict,
|
|
album_data: dict,
|
|
source_path: str,
|
|
source_track_id: str,
|
|
api_track: dict,
|
|
album_source_id: str,
|
|
metadata_source: str,
|
|
) -> dict:
|
|
api_album = {
|
|
"id": album_source_id,
|
|
"name": album_data.get("title") or "",
|
|
"title": album_data.get("title") or "",
|
|
"release_date": f"{album_data.get('year')}-01-01" if album_data.get("year") else "",
|
|
"total_tracks": album_data.get("api_track_count") or album_data.get("track_count") or 0,
|
|
"image_url": album_data.get("thumb_url") or "",
|
|
"source": metadata_source,
|
|
}
|
|
context = _build_post_process_context(
|
|
api_album,
|
|
api_track,
|
|
album_data.get("artist_name") or "",
|
|
album_data.get("title") or "",
|
|
int(payload.get("total_discs") or payload.get("expected_track", {}).get("total_discs") or 1),
|
|
)
|
|
context["source"] = metadata_source
|
|
context["source_service"] = "existing_library"
|
|
context["source_filename"] = os.path.basename(source_path)
|
|
context["source_size"] = os.path.getsize(source_path) if os.path.exists(source_path) else 0
|
|
context["explicit_album_context"] = True
|
|
context["from_existing_library_track"] = True
|
|
context["batch_id"] = f"existing_import_{album_data.get('id')}_{uuid.uuid4().hex[:8]}"
|
|
context["task_id"] = f"existing_import_{source_track_id}"
|
|
return context
|
|
|
|
|
|
def _upsert_target_track(
|
|
database,
|
|
deps: MissingTrackImportDeps,
|
|
album_id: str,
|
|
album_data: dict,
|
|
source_track: dict,
|
|
final_path: str,
|
|
expected_title: str,
|
|
expected_track_id: str,
|
|
metadata_source: str,
|
|
api_track: dict,
|
|
):
|
|
file_size, bitrate = _read_file_stats(final_path, source_track)
|
|
server_source = album_data.get("server_source") or source_track.get("server_source") or deps.config_manager.get_active_media_server()
|
|
|
|
with database._get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
_ensure_disc_number_column(cursor, conn)
|
|
|
|
cursor.execute("SELECT id FROM tracks WHERE file_path = ? LIMIT 1", (final_path,))
|
|
existing_by_path = cursor.fetchone()
|
|
cursor.execute(
|
|
"""
|
|
SELECT id FROM tracks
|
|
WHERE album_id = ? AND COALESCE(disc_number, 1) = ? AND track_number = ?
|
|
LIMIT 1
|
|
""",
|
|
(album_id, api_track["disc_number"], api_track["track_number"]),
|
|
)
|
|
existing_target = cursor.fetchone()
|
|
|
|
if existing_by_path:
|
|
target_track_id = existing_by_path["id"]
|
|
cursor.execute(
|
|
"""
|
|
UPDATE tracks
|
|
SET album_id = ?, artist_id = ?, title = ?, track_number = ?, disc_number = ?,
|
|
duration = ?, file_path = ?, bitrate = ?, file_size = ?,
|
|
server_source = COALESCE(server_source, ?),
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
""",
|
|
(
|
|
album_id,
|
|
album_data.get("target_artist_id"),
|
|
expected_title,
|
|
api_track["track_number"],
|
|
api_track["disc_number"],
|
|
api_track["duration_ms"],
|
|
final_path,
|
|
bitrate,
|
|
file_size,
|
|
server_source,
|
|
target_track_id,
|
|
),
|
|
)
|
|
elif existing_target:
|
|
target_track_id = existing_target["id"]
|
|
cursor.execute(
|
|
"""
|
|
UPDATE tracks
|
|
SET title = ?, duration = ?, file_path = ?, bitrate = ?, file_size = ?,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
""",
|
|
(expected_title, api_track["duration_ms"], final_path, bitrate, file_size, target_track_id),
|
|
)
|
|
else:
|
|
cursor.execute("SELECT COALESCE(MAX(CAST(id AS INTEGER)), 0) + 1 AS next_id FROM tracks")
|
|
target_track_id = cursor.fetchone()["next_id"]
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO tracks (
|
|
id, album_id, artist_id, title, track_number, disc_number, duration,
|
|
file_path, bitrate, file_size, server_source, created_at, updated_at
|
|
)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
|
|
""",
|
|
(
|
|
target_track_id,
|
|
album_id,
|
|
album_data.get("target_artist_id"),
|
|
expected_title,
|
|
api_track["track_number"],
|
|
api_track["disc_number"],
|
|
api_track["duration_ms"],
|
|
final_path,
|
|
bitrate,
|
|
file_size,
|
|
server_source,
|
|
),
|
|
)
|
|
|
|
track_source_col = (deps.service_id_columns or {}).get(metadata_source, {}).get("track")
|
|
if track_source_col and expected_track_id:
|
|
try:
|
|
cursor.execute(f"UPDATE tracks SET {track_source_col} = ? WHERE id = ?", (expected_track_id, target_track_id))
|
|
except Exception as source_err:
|
|
logger.debug("Imported track source-id update failed: %s", source_err)
|
|
|
|
conn.commit()
|
|
|
|
return target_track_id
|
|
|
|
|
|
def _ensure_disc_number_column(cursor, conn) -> None:
|
|
cursor.execute("PRAGMA table_info(tracks)")
|
|
track_columns = {row[1] for row in cursor.fetchall()}
|
|
if "disc_number" not in track_columns:
|
|
cursor.execute("ALTER TABLE tracks ADD COLUMN disc_number INTEGER DEFAULT 1")
|
|
conn.commit()
|
|
|
|
|
|
def ensure_tracks_disc_number_column(database) -> None:
|
|
with database._get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
_ensure_disc_number_column(cursor, conn)
|
|
|
|
|
|
def _read_file_stats(final_path: str, source_track: dict) -> tuple[Optional[int], int]:
|
|
file_size = None
|
|
bitrate = source_track.get("bitrate") or 0
|
|
try:
|
|
file_size = os.path.getsize(final_path)
|
|
from mutagen import File as MutagenFile
|
|
|
|
audio = MutagenFile(final_path)
|
|
if audio and getattr(audio, "info", None) and getattr(audio.info, "bitrate", None):
|
|
bitrate = int(audio.info.bitrate / 1000)
|
|
except Exception as meta_err:
|
|
logger.debug("Existing-track import metadata read failed: %s", meta_err)
|
|
return file_size, bitrate
|
|
|
|
|
|
def _sync_imported_track(deps: MissingTrackImportDeps, track_id, expected_title: str, album_data: dict) -> None:
|
|
try:
|
|
active_server = deps.config_manager.get_active_media_server()
|
|
if deps.sync_tracks_to_server_fn and active_server in ("jellyfin", "navidrome"):
|
|
deps.sync_tracks_to_server_fn(
|
|
[
|
|
{
|
|
"id": track_id,
|
|
"title": expected_title,
|
|
"artist_name": album_data.get("artist_name"),
|
|
"album_title": album_data.get("title"),
|
|
"year": album_data.get("year"),
|
|
"server_source": album_data.get("server_source"),
|
|
}
|
|
],
|
|
active_server,
|
|
)
|
|
except Exception as sync_err:
|
|
logger.debug("Existing-track import server sync skipped/failed: %s", sync_err)
|
|
|
|
|
|
def copy_album_identity_from_target_sibling(
|
|
database,
|
|
album_id: str,
|
|
final_path: str,
|
|
target_disc: int,
|
|
target_track: int,
|
|
resolve_library_file_path_fn: Callable[[Optional[str]], Optional[str]],
|
|
) -> bool:
|
|
try:
|
|
with database._get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"""
|
|
SELECT file_path FROM tracks
|
|
WHERE album_id = ?
|
|
AND file_path IS NOT NULL
|
|
AND file_path != ''
|
|
AND NOT (COALESCE(disc_number, 1) = ? AND track_number = ?)
|
|
ORDER BY COALESCE(disc_number, 1), track_number
|
|
LIMIT 12
|
|
""",
|
|
(album_id, target_disc, target_track),
|
|
)
|
|
sibling_rows = cursor.fetchall()
|
|
|
|
for row in sibling_rows:
|
|
sibling_path = resolve_library_file_path_fn(row["file_path"])
|
|
if not sibling_path or not os.path.exists(sibling_path):
|
|
continue
|
|
tags = read_album_identity_tags(sibling_path)
|
|
if not tags:
|
|
continue
|
|
if write_album_identity_tags(final_path, tags):
|
|
logger.info("Imported track inherited album identity tags from sibling: %s", os.path.basename(sibling_path))
|
|
return True
|
|
except Exception as exc:
|
|
logger.warning("Failed to inherit album identity tags for imported track: %s", exc)
|
|
return False
|
|
|
|
|
|
def read_album_identity_tags(file_path: str) -> dict:
|
|
try:
|
|
from mutagen import File as MutagenFile
|
|
from mutagen.id3 import ID3, TXXX
|
|
from mutagen.mp4 import MP4
|
|
|
|
audio = MutagenFile(file_path)
|
|
if not audio:
|
|
return {}
|
|
|
|
tags = {}
|
|
if isinstance(getattr(audio, "tags", None), ID3):
|
|
for tag_key, frame_id in _ID3_STANDARD_TAGS.items():
|
|
frames = audio.tags.getall(frame_id)
|
|
if frames and getattr(frames[0], "text", None):
|
|
tags[tag_key] = str(frames[0].text[0]).strip()
|
|
desc_to_key = {desc: key for key, desc in _ID3_TXXX_DESCS.items()}
|
|
for frame in audio.tags.getall("TXXX"):
|
|
if isinstance(frame, TXXX) and frame.desc in desc_to_key and frame.text:
|
|
tags[desc_to_key[frame.desc]] = str(frame.text[0]).strip()
|
|
elif isinstance(audio, MP4):
|
|
for tag_key, mp4_key in _MP4_STANDARD_TAGS.items():
|
|
value = _first_tag_value(audio, mp4_key)
|
|
if value:
|
|
tags[tag_key] = value
|
|
for tag_key, desc in _ID3_TXXX_DESCS.items():
|
|
value = _first_tag_value(audio, f"----:com.apple.iTunes:{desc}")
|
|
if value:
|
|
tags[tag_key] = value
|
|
else:
|
|
for tag_key in _ALBUM_IDENTITY_TAGS:
|
|
value = _first_tag_value(audio, tag_key)
|
|
if value:
|
|
tags[tag_key] = value
|
|
return {k: v for k, v in tags.items() if v}
|
|
except Exception as exc:
|
|
logger.debug("Failed reading album identity tags from %s: %s", file_path, exc)
|
|
return {}
|
|
|
|
|
|
def write_album_identity_tags(file_path: str, tags: dict) -> bool:
|
|
if not tags:
|
|
return False
|
|
try:
|
|
from mutagen import File as MutagenFile
|
|
from mutagen.id3 import ID3, TALB, TDRC, TPE2, TXXX
|
|
from mutagen.mp4 import MP4, MP4FreeForm
|
|
|
|
audio = MutagenFile(file_path)
|
|
if not audio:
|
|
return False
|
|
|
|
tags = {k: str(v).strip() for k, v in tags.items() if k in _ALBUM_IDENTITY_TAGS and str(v).strip()}
|
|
if not tags:
|
|
return False
|
|
|
|
if isinstance(getattr(audio, "tags", None), ID3):
|
|
standard_frames = {"TALB": TALB, "TPE2": TPE2, "TDRC": TDRC}
|
|
written_standard = set()
|
|
for tag_key, frame_id in _ID3_STANDARD_TAGS.items():
|
|
value = tags.get(tag_key)
|
|
if not value or frame_id in written_standard:
|
|
continue
|
|
audio.tags.delall(frame_id)
|
|
audio.tags.add(standard_frames[frame_id](encoding=3, text=[value]))
|
|
written_standard.add(frame_id)
|
|
for tag_key, desc in _ID3_TXXX_DESCS.items():
|
|
value = tags.get(tag_key)
|
|
if not value:
|
|
continue
|
|
for existing in list(audio.tags.getall("TXXX")):
|
|
if getattr(existing, "desc", None) == desc:
|
|
audio.tags.remove(existing.HashKey)
|
|
audio.tags.add(TXXX(encoding=3, desc=desc, text=[value]))
|
|
elif isinstance(audio, MP4):
|
|
for tag_key, mp4_key in _MP4_STANDARD_TAGS.items():
|
|
if tags.get(tag_key):
|
|
audio[mp4_key] = [tags[tag_key]]
|
|
for tag_key, desc in _ID3_TXXX_DESCS.items():
|
|
if tags.get(tag_key):
|
|
audio[f"----:com.apple.iTunes:{desc}"] = [MP4FreeForm(tags[tag_key].encode("utf-8"))]
|
|
else:
|
|
for tag_key, value in tags.items():
|
|
audio[tag_key] = [value]
|
|
|
|
audio.save()
|
|
return True
|
|
except Exception as exc:
|
|
logger.warning("Failed writing album identity tags to %s: %s", file_path, exc)
|
|
return False
|
|
|
|
|
|
def _first_tag_value(audio, key: str) -> Optional[str]:
|
|
try:
|
|
values = audio.get(key)
|
|
if not values:
|
|
return None
|
|
value = values[0] if isinstance(values, (list, tuple)) else values
|
|
if isinstance(value, bytes):
|
|
value = value.decode("utf-8", errors="ignore")
|
|
return str(value).strip() or None
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _expected_track_id(expected: dict) -> str:
|
|
return (
|
|
expected.get("track_id")
|
|
or expected.get("id")
|
|
or expected.get("source_track_id")
|
|
or expected.get("spotify_track_id")
|
|
or expected.get("deezer_id")
|
|
or expected.get("itunes_track_id")
|
|
or expected.get("musicbrainz_recording_id")
|
|
or ""
|
|
)
|
|
|
|
|
|
def _album_source_id(payload: dict, expected: dict, album_data: dict, album_id: str) -> str:
|
|
return (
|
|
payload.get("album_source_id")
|
|
or expected.get("album_id")
|
|
or album_data.get("spotify_album_id")
|
|
or album_data.get("deezer_id")
|
|
or album_data.get("itunes_album_id")
|
|
or album_data.get("musicbrainz_release_id")
|
|
or album_data.get("discogs_id")
|
|
or album_data.get("tidal_id")
|
|
or album_data.get("qobuz_id")
|
|
or str(album_id)
|
|
)
|
|
|
|
|
|
def _file_not_found_message(file_path: Optional[str]) -> str:
|
|
if file_path:
|
|
return f"File not found: {file_path}"
|
|
return "Selected library track does not have a file path"
|