You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
SoulSync/core/library/missing_track_import.py

606 lines
22 KiB

"""Import an existing library file into a missing album slot.
This module keeps the "I Have This" behavior out of the Flask route layer:
copy the selected source file, post-process it with target album metadata,
inherit album identity tags from target siblings, and write the real DB row.
"""
from __future__ import annotations
from dataclasses import dataclass
import logging
import os
import shutil
import uuid
from typing import Any, Callable, Dict, Optional
from core.library_reorganize import _build_post_process_context
logger = logging.getLogger("soulsync.library.missing_track_import")
class MissingTrackImportError(Exception):
"""Expected import failure that should be surfaced to the API caller."""
def __init__(self, message: str, status_code: int = 400):
super().__init__(message)
self.status_code = status_code
@dataclass
class MissingTrackImportDeps:
database: Any
config_manager: Any
post_process_fn: Callable[[str, dict, str], Any]
resolve_library_file_path_fn: Callable[[Optional[str]], Optional[str]]
docker_resolve_path_fn: Callable[[str], str]
sync_tracks_to_server_fn: Optional[Callable[[list, str], Any]] = None
service_id_columns: Optional[Dict[str, Dict[str, str]]] = None
_ALBUM_IDENTITY_TAGS = {
"album",
"albumartist",
"album_artist",
"date",
"year",
"tracktotal",
"totaltracks",
"totaldiscs",
"musicbrainz_albumid",
"musicbrainz_albumartistid",
"musicbrainz_releasegroupid",
"barcode",
"catalognumber",
"originaldate",
"releasecountry",
"releasestatus",
"releasetype",
"media",
"script",
"copyright",
"spotify_album_id",
"deezer_album_id",
"tidal_album_id",
"qobuz_album_id",
"itunes_album_id",
"audiodb_album_id",
}
_ID3_STANDARD_TAGS = {
"album": "TALB",
"albumartist": "TPE2",
"album_artist": "TPE2",
"date": "TDRC",
"year": "TDRC",
}
_ID3_TXXX_DESCS = {
"musicbrainz_albumid": "MusicBrainz Album Id",
"musicbrainz_albumartistid": "MusicBrainz Album Artist Id",
"musicbrainz_releasegroupid": "MusicBrainz Release Group Id",
"barcode": "BARCODE",
"catalognumber": "CATALOGNUMBER",
"originaldate": "ORIGINALDATE",
"releasecountry": "RELEASECOUNTRY",
"releasestatus": "RELEASESTATUS",
"releasetype": "RELEASETYPE",
"media": "MEDIA",
"script": "SCRIPT",
"totaldiscs": "TOTALDISCS",
"tracktotal": "TOTALTRACKS",
"totaltracks": "TOTALTRACKS",
"spotify_album_id": "Spotify Album Id",
"deezer_album_id": "Deezer Album Id",
"tidal_album_id": "Tidal Album Id",
"qobuz_album_id": "Qobuz Album Id",
"itunes_album_id": "iTunes Album Id",
"audiodb_album_id": "AudioDB Album Id",
}
_MP4_STANDARD_TAGS = {
"album": "\xa9alb",
"albumartist": "aART",
"album_artist": "aART",
"date": "\xa9day",
"year": "\xa9day",
}
def import_existing_track_for_album_slot(album_id: str, payload: dict, deps: MissingTrackImportDeps) -> dict:
source_track_id = payload.get("source_track_id") or payload.get("linked_track_id")
expected = payload.get("expected_track") or {}
if not source_track_id:
raise MissingTrackImportError("source_track_id is required", 400)
if not expected.get("track_number") or not (expected.get("title") or expected.get("name")):
raise MissingTrackImportError("expected_track with title and track_number is required", 400)
database = deps.database
album_data, source_track = _load_album_and_source_track(database, album_id, source_track_id)
if album_data.get("server_source") and source_track.get("server_source") and album_data["server_source"] != source_track["server_source"]:
raise MissingTrackImportError("Selected track belongs to a different library source", 400)
source_path = deps.resolve_library_file_path_fn(source_track.get("file_path"))
if not source_path:
raise MissingTrackImportError(_file_not_found_message(source_track.get("file_path")), 404)
staging_path = _copy_source_to_staging(source_path, album_id, expected, deps)
metadata_source = (expected.get("source") or payload.get("source") or "").strip().lower() or "library"
expected_title = expected.get("title") or expected.get("name") or "Unknown Track"
expected_track_id = _expected_track_id(expected)
album_source_id = _album_source_id(payload, expected, album_data, album_id)
api_track = _build_api_track(expected, expected_title, expected_track_id, album_source_id, metadata_source, album_data)
context = _build_context(payload, album_data, source_path, source_track_id, api_track, album_source_id, metadata_source)
context_key = f"existing_import_{album_id}_{api_track['disc_number']}_{api_track['track_number']}_{uuid.uuid4().hex[:8]}"
deps.post_process_fn(context_key, context, staging_path)
final_path = context.get("_final_processed_path")
if not final_path or not os.path.exists(final_path):
raise MissingTrackImportError("Post-processing did not produce a final file", 500)
ensure_tracks_disc_number_column(database)
copy_album_identity_from_target_sibling(
database,
album_id,
final_path,
api_track["disc_number"],
api_track["track_number"],
deps.resolve_library_file_path_fn,
)
target_track_id = _upsert_target_track(
database,
deps,
album_id,
album_data,
source_track,
final_path,
expected_title,
expected_track_id,
metadata_source,
api_track,
)
_sync_imported_track(deps, target_track_id, expected_title, album_data)
return {
"track_id": target_track_id,
"final_path": final_path,
"artist_id": album_data.get("target_artist_id"),
}
def _load_album_and_source_track(database, album_id: str, source_track_id: str) -> tuple[dict, dict]:
with database._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT al.*, ar.name AS artist_name, ar.id AS target_artist_id
FROM albums al
JOIN artists ar ON ar.id = al.artist_id
WHERE al.id = ?
""",
(album_id,),
)
album_row = cursor.fetchone()
if not album_row:
raise MissingTrackImportError("Album not found", 404)
cursor.execute("SELECT * FROM tracks WHERE id = ?", (source_track_id,))
source_row = cursor.fetchone()
if not source_row:
raise MissingTrackImportError("Selected library track not found", 404)
return dict(album_row), dict(source_row)
def _copy_source_to_staging(source_path: str, album_id: str, expected: dict, deps: MissingTrackImportDeps) -> str:
download_dir = deps.docker_resolve_path_fn(deps.config_manager.get("soulseek.download_path", "./downloads"))
staging_root = os.path.join(download_dir, "ssync_existing_import")
os.makedirs(staging_root, exist_ok=True)
source_ext = os.path.splitext(source_path)[1] or ".audio"
staging_name = (
f"existing_{album_id}_{expected.get('disc_number') or 1}_"
f"{expected.get('track_number')}_{uuid.uuid4().hex[:8]}{source_ext}"
)
staging_path = os.path.join(staging_root, staging_name)
shutil.copy2(source_path, staging_path)
return staging_path
def _build_api_track(
expected: dict,
expected_title: str,
expected_track_id: str,
album_source_id: str,
metadata_source: str,
album_data: dict,
) -> dict:
return {
"id": expected_track_id,
"track_id": expected_track_id,
"name": expected_title,
"title": expected_title,
"track_number": int(expected.get("track_number") or 1),
"disc_number": int(expected.get("disc_number") or 1),
"duration_ms": int(expected.get("duration") or expected.get("duration_ms") or 0),
"artists": expected.get("artists") or [album_data.get("artist_name") or ""],
"source": metadata_source,
"album_id": album_source_id,
"spotify_track_id": expected.get("spotify_track_id") or "",
"deezer_id": expected.get("deezer_id") or "",
"itunes_track_id": expected.get("itunes_track_id") or "",
"musicbrainz_recording_id": expected.get("musicbrainz_recording_id") or "",
}
def _build_context(
payload: dict,
album_data: dict,
source_path: str,
source_track_id: str,
api_track: dict,
album_source_id: str,
metadata_source: str,
) -> dict:
api_album = {
"id": album_source_id,
"name": album_data.get("title") or "",
"title": album_data.get("title") or "",
"release_date": f"{album_data.get('year')}-01-01" if album_data.get("year") else "",
"total_tracks": album_data.get("api_track_count") or album_data.get("track_count") or 0,
"image_url": album_data.get("thumb_url") or "",
"source": metadata_source,
}
context = _build_post_process_context(
api_album,
api_track,
album_data.get("artist_name") or "",
album_data.get("title") or "",
int(payload.get("total_discs") or payload.get("expected_track", {}).get("total_discs") or 1),
)
context["source"] = metadata_source
context["source_service"] = "existing_library"
context["source_filename"] = os.path.basename(source_path)
context["source_size"] = os.path.getsize(source_path) if os.path.exists(source_path) else 0
context["explicit_album_context"] = True
context["from_existing_library_track"] = True
context["batch_id"] = f"existing_import_{album_data.get('id')}_{uuid.uuid4().hex[:8]}"
context["task_id"] = f"existing_import_{source_track_id}"
return context
def _upsert_target_track(
database,
deps: MissingTrackImportDeps,
album_id: str,
album_data: dict,
source_track: dict,
final_path: str,
expected_title: str,
expected_track_id: str,
metadata_source: str,
api_track: dict,
):
file_size, bitrate = _read_file_stats(final_path, source_track)
server_source = album_data.get("server_source") or source_track.get("server_source") or deps.config_manager.get_active_media_server()
with database._get_connection() as conn:
cursor = conn.cursor()
_ensure_disc_number_column(cursor, conn)
cursor.execute("SELECT id FROM tracks WHERE file_path = ? LIMIT 1", (final_path,))
existing_by_path = cursor.fetchone()
cursor.execute(
"""
SELECT id FROM tracks
WHERE album_id = ? AND COALESCE(disc_number, 1) = ? AND track_number = ?
LIMIT 1
""",
(album_id, api_track["disc_number"], api_track["track_number"]),
)
existing_target = cursor.fetchone()
if existing_by_path:
target_track_id = existing_by_path["id"]
cursor.execute(
"""
UPDATE tracks
SET album_id = ?, artist_id = ?, title = ?, track_number = ?, disc_number = ?,
duration = ?, file_path = ?, bitrate = ?, file_size = ?,
server_source = COALESCE(server_source, ?),
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(
album_id,
album_data.get("target_artist_id"),
expected_title,
api_track["track_number"],
api_track["disc_number"],
api_track["duration_ms"],
final_path,
bitrate,
file_size,
server_source,
target_track_id,
),
)
elif existing_target:
target_track_id = existing_target["id"]
cursor.execute(
"""
UPDATE tracks
SET title = ?, duration = ?, file_path = ?, bitrate = ?, file_size = ?,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(expected_title, api_track["duration_ms"], final_path, bitrate, file_size, target_track_id),
)
else:
cursor.execute("SELECT COALESCE(MAX(CAST(id AS INTEGER)), 0) + 1 AS next_id FROM tracks")
target_track_id = cursor.fetchone()["next_id"]
cursor.execute(
"""
INSERT INTO tracks (
id, album_id, artist_id, title, track_number, disc_number, duration,
file_path, bitrate, file_size, server_source, created_at, updated_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
""",
(
target_track_id,
album_id,
album_data.get("target_artist_id"),
expected_title,
api_track["track_number"],
api_track["disc_number"],
api_track["duration_ms"],
final_path,
bitrate,
file_size,
server_source,
),
)
track_source_col = (deps.service_id_columns or {}).get(metadata_source, {}).get("track")
if track_source_col and expected_track_id:
try:
cursor.execute(f"UPDATE tracks SET {track_source_col} = ? WHERE id = ?", (expected_track_id, target_track_id))
except Exception as source_err:
logger.debug("Imported track source-id update failed: %s", source_err)
conn.commit()
return target_track_id
def _ensure_disc_number_column(cursor, conn) -> None:
cursor.execute("PRAGMA table_info(tracks)")
track_columns = {row[1] for row in cursor.fetchall()}
if "disc_number" not in track_columns:
cursor.execute("ALTER TABLE tracks ADD COLUMN disc_number INTEGER DEFAULT 1")
conn.commit()
def ensure_tracks_disc_number_column(database) -> None:
with database._get_connection() as conn:
cursor = conn.cursor()
_ensure_disc_number_column(cursor, conn)
def _read_file_stats(final_path: str, source_track: dict) -> tuple[Optional[int], int]:
file_size = None
bitrate = source_track.get("bitrate") or 0
try:
file_size = os.path.getsize(final_path)
from mutagen import File as MutagenFile
audio = MutagenFile(final_path)
if audio and getattr(audio, "info", None) and getattr(audio.info, "bitrate", None):
bitrate = int(audio.info.bitrate / 1000)
except Exception as meta_err:
logger.debug("Existing-track import metadata read failed: %s", meta_err)
return file_size, bitrate
def _sync_imported_track(deps: MissingTrackImportDeps, track_id, expected_title: str, album_data: dict) -> None:
try:
active_server = deps.config_manager.get_active_media_server()
if deps.sync_tracks_to_server_fn and active_server in ("jellyfin", "navidrome"):
deps.sync_tracks_to_server_fn(
[
{
"id": track_id,
"title": expected_title,
"artist_name": album_data.get("artist_name"),
"album_title": album_data.get("title"),
"year": album_data.get("year"),
"server_source": album_data.get("server_source"),
}
],
active_server,
)
except Exception as sync_err:
logger.debug("Existing-track import server sync skipped/failed: %s", sync_err)
def copy_album_identity_from_target_sibling(
database,
album_id: str,
final_path: str,
target_disc: int,
target_track: int,
resolve_library_file_path_fn: Callable[[Optional[str]], Optional[str]],
) -> bool:
try:
with database._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT file_path FROM tracks
WHERE album_id = ?
AND file_path IS NOT NULL
AND file_path != ''
AND NOT (COALESCE(disc_number, 1) = ? AND track_number = ?)
ORDER BY COALESCE(disc_number, 1), track_number
LIMIT 12
""",
(album_id, target_disc, target_track),
)
sibling_rows = cursor.fetchall()
for row in sibling_rows:
sibling_path = resolve_library_file_path_fn(row["file_path"])
if not sibling_path or not os.path.exists(sibling_path):
continue
tags = read_album_identity_tags(sibling_path)
if not tags:
continue
if write_album_identity_tags(final_path, tags):
logger.info("Imported track inherited album identity tags from sibling: %s", os.path.basename(sibling_path))
return True
except Exception as exc:
logger.warning("Failed to inherit album identity tags for imported track: %s", exc)
return False
def read_album_identity_tags(file_path: str) -> dict:
try:
from mutagen import File as MutagenFile
from mutagen.id3 import ID3, TXXX
from mutagen.mp4 import MP4
audio = MutagenFile(file_path)
if not audio:
return {}
tags = {}
if isinstance(getattr(audio, "tags", None), ID3):
for tag_key, frame_id in _ID3_STANDARD_TAGS.items():
frames = audio.tags.getall(frame_id)
if frames and getattr(frames[0], "text", None):
tags[tag_key] = str(frames[0].text[0]).strip()
desc_to_key = {desc: key for key, desc in _ID3_TXXX_DESCS.items()}
for frame in audio.tags.getall("TXXX"):
if isinstance(frame, TXXX) and frame.desc in desc_to_key and frame.text:
tags[desc_to_key[frame.desc]] = str(frame.text[0]).strip()
elif isinstance(audio, MP4):
for tag_key, mp4_key in _MP4_STANDARD_TAGS.items():
value = _first_tag_value(audio, mp4_key)
if value:
tags[tag_key] = value
for tag_key, desc in _ID3_TXXX_DESCS.items():
value = _first_tag_value(audio, f"----:com.apple.iTunes:{desc}")
if value:
tags[tag_key] = value
else:
for tag_key in _ALBUM_IDENTITY_TAGS:
value = _first_tag_value(audio, tag_key)
if value:
tags[tag_key] = value
return {k: v for k, v in tags.items() if v}
except Exception as exc:
logger.debug("Failed reading album identity tags from %s: %s", file_path, exc)
return {}
def write_album_identity_tags(file_path: str, tags: dict) -> bool:
if not tags:
return False
try:
from mutagen import File as MutagenFile
from mutagen.id3 import ID3, TALB, TDRC, TPE2, TXXX
from mutagen.mp4 import MP4, MP4FreeForm
audio = MutagenFile(file_path)
if not audio:
return False
tags = {k: str(v).strip() for k, v in tags.items() if k in _ALBUM_IDENTITY_TAGS and str(v).strip()}
if not tags:
return False
if isinstance(getattr(audio, "tags", None), ID3):
standard_frames = {"TALB": TALB, "TPE2": TPE2, "TDRC": TDRC}
written_standard = set()
for tag_key, frame_id in _ID3_STANDARD_TAGS.items():
value = tags.get(tag_key)
if not value or frame_id in written_standard:
continue
audio.tags.delall(frame_id)
audio.tags.add(standard_frames[frame_id](encoding=3, text=[value]))
written_standard.add(frame_id)
for tag_key, desc in _ID3_TXXX_DESCS.items():
value = tags.get(tag_key)
if not value:
continue
for existing in list(audio.tags.getall("TXXX")):
if getattr(existing, "desc", None) == desc:
audio.tags.remove(existing.HashKey)
audio.tags.add(TXXX(encoding=3, desc=desc, text=[value]))
elif isinstance(audio, MP4):
for tag_key, mp4_key in _MP4_STANDARD_TAGS.items():
if tags.get(tag_key):
audio[mp4_key] = [tags[tag_key]]
for tag_key, desc in _ID3_TXXX_DESCS.items():
if tags.get(tag_key):
audio[f"----:com.apple.iTunes:{desc}"] = [MP4FreeForm(tags[tag_key].encode("utf-8"))]
else:
for tag_key, value in tags.items():
audio[tag_key] = [value]
audio.save()
return True
except Exception as exc:
logger.warning("Failed writing album identity tags to %s: %s", file_path, exc)
return False
def _first_tag_value(audio, key: str) -> Optional[str]:
try:
values = audio.get(key)
if not values:
return None
value = values[0] if isinstance(values, (list, tuple)) else values
if isinstance(value, bytes):
value = value.decode("utf-8", errors="ignore")
return str(value).strip() or None
except Exception:
return None
def _expected_track_id(expected: dict) -> str:
return (
expected.get("track_id")
or expected.get("id")
or expected.get("source_track_id")
or expected.get("spotify_track_id")
or expected.get("deezer_id")
or expected.get("itunes_track_id")
or expected.get("musicbrainz_recording_id")
or ""
)
def _album_source_id(payload: dict, expected: dict, album_data: dict, album_id: str) -> str:
return (
payload.get("album_source_id")
or expected.get("album_id")
or album_data.get("spotify_album_id")
or album_data.get("deezer_id")
or album_data.get("itunes_album_id")
or album_data.get("musicbrainz_release_id")
or album_data.get("discogs_id")
or album_data.get("tidal_id")
or album_data.get("qobuz_id")
or str(album_id)
)
def _file_not_found_message(file_path: Optional[str]) -> str:
if file_path:
return f"File not found: {file_path}"
return "Selected library track does not have a file path"