You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
SoulSync/tests/library/test_missing_track_import.py

261 lines
9.8 KiB

"""Tests for the Enhanced Library "I Have This" import service."""
from __future__ import annotations
import os
import shutil
import sqlite3
from dataclasses import dataclass
import pytest
from core.library import missing_track_import as mti
class _ConnCtx:
def __init__(self, conn):
self.conn = conn
def __enter__(self):
return self.conn
def __exit__(self, exc_type, exc, tb):
return False
class _FakeDB:
def __init__(self, conn):
self.conn = conn
def _get_connection(self):
return _ConnCtx(self.conn)
@dataclass
class _FakeConfig:
download_path: str
active_server: str = "navidrome"
def get(self, key, default=None):
if key == "soulseek.download_path":
return self.download_path
return default
def get_active_media_server(self):
return self.active_server
def _make_db(*, include_disc_number: bool = True) -> tuple[_FakeDB, sqlite3.Connection]:
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
cur = conn.cursor()
cur.execute(
"""
CREATE TABLE artists (
id TEXT PRIMARY KEY,
name TEXT NOT NULL
)
"""
)
cur.execute(
"""
CREATE TABLE albums (
id TEXT PRIMARY KEY,
artist_id TEXT NOT NULL,
title TEXT NOT NULL,
year INTEGER,
track_count INTEGER,
server_source TEXT,
deezer_id TEXT,
thumb_url TEXT
)
"""
)
disc_col = ", disc_number INTEGER DEFAULT 1" if include_disc_number else ""
cur.execute(
f"""
CREATE TABLE tracks (
id TEXT PRIMARY KEY,
album_id TEXT NOT NULL,
artist_id TEXT NOT NULL,
title TEXT NOT NULL,
track_number INTEGER{disc_col},
duration INTEGER,
file_path TEXT,
bitrate INTEGER,
file_size INTEGER,
server_source TEXT,
deezer_id TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
)
cur.execute("INSERT INTO artists (id, name) VALUES ('artist-1', 'Kendrick Lamar')")
cur.execute(
"""
INSERT INTO albums (id, artist_id, title, year, track_count, server_source, deezer_id)
VALUES ('album-basic', 'artist-1', 'DAMN.', 2017, 14, 'navidrome', '302127')
"""
)
cur.execute(
"""
INSERT INTO albums (id, artist_id, title, year, track_count, server_source, deezer_id)
VALUES ('album-deluxe', 'artist-1', 'DAMN. COLLECTORS EDITION', 2017, 14, 'navidrome', '999999')
"""
)
conn.commit()
return _FakeDB(conn), conn
def _insert_track(conn, *, track_id, album_id, title, track_number, file_path, disc_number=1):
columns = [row[1] for row in conn.execute("PRAGMA table_info(tracks)").fetchall()]
if "disc_number" in columns:
conn.execute(
"""
INSERT INTO tracks (id, album_id, artist_id, title, track_number, disc_number, duration, file_path, bitrate, file_size, server_source)
VALUES (?, ?, 'artist-1', ?, ?, ?, 177000, ?, 900, 1234, 'navidrome')
""",
(track_id, album_id, title, track_number, disc_number, str(file_path)),
)
else:
conn.execute(
"""
INSERT INTO tracks (id, album_id, artist_id, title, track_number, duration, file_path, bitrate, file_size, server_source)
VALUES (?, ?, 'artist-1', ?, ?, 177000, ?, 900, 1234, 'navidrome')
""",
(track_id, album_id, title, track_number, str(file_path)),
)
conn.commit()
def _deps(tmp_path, db, *, post_process_fn=None, sync_calls=None):
sync_calls = sync_calls if sync_calls is not None else []
def _default_post_process(_key, context, staged_path):
final_dir = tmp_path / "Library" / "Kendrick Lamar - 2017 DAMN"
final_dir.mkdir(parents=True, exist_ok=True)
final_path = final_dir / "08 - HUMBLE [FLAC 16bit].flac"
shutil.copy2(staged_path, final_path)
context["_final_processed_path"] = str(final_path)
return mti.MissingTrackImportDeps(
database=db,
config_manager=_FakeConfig(str(tmp_path / "downloads")),
post_process_fn=post_process_fn or _default_post_process,
resolve_library_file_path_fn=lambda path: str(path) if path and os.path.exists(path) else None,
docker_resolve_path_fn=lambda path: path,
sync_tracks_to_server_fn=lambda rows, server: sync_calls.append((rows, server)),
service_id_columns={"deezer": {"track": "deezer_id"}},
)
def _payload():
return {
"source_track_id": "deluxe-humble",
"album_source_id": "302127",
"total_discs": 1,
"expected_track": {
"title": "HUMBLE.",
"track_number": 8,
"disc_number": 1,
"duration": 177000,
"source": "deezer",
"track_id": "350171311",
"deezer_id": "350171311",
"artists": ["Kendrick Lamar"],
},
}
def test_import_existing_track_copies_file_and_writes_target_album_row(tmp_path, monkeypatch):
db, conn = _make_db(include_disc_number=True)
source_file = tmp_path / "deluxe" / "08 - HUMBLE.flac"
source_file.parent.mkdir()
source_file.write_bytes(b"source audio")
sibling_file = tmp_path / "basic" / "01 - BLOOD.flac"
sibling_file.parent.mkdir()
sibling_file.write_bytes(b"sibling audio")
_insert_track(conn, track_id="basic-blood", album_id="album-basic", title="BLOOD.", track_number=1, file_path=sibling_file)
_insert_track(conn, track_id="deluxe-humble", album_id="album-deluxe", title="HUMBLE.", track_number=8, file_path=source_file)
inherited = []
monkeypatch.setattr(mti, "read_album_identity_tags", lambda path: {"musicbrainz_albumid": "target-release"} if path == str(sibling_file) else {})
monkeypatch.setattr(mti, "write_album_identity_tags", lambda path, tags: inherited.append((path, tags)) or True)
sync_calls = []
result = mti.import_existing_track_for_album_slot("album-basic", _payload(), _deps(tmp_path, db, sync_calls=sync_calls))
assert source_file.read_bytes() == b"source audio"
assert os.path.exists(result["final_path"])
assert inherited == [(result["final_path"], {"musicbrainz_albumid": "target-release"})]
row = conn.execute("SELECT * FROM tracks WHERE album_id = 'album-basic' AND track_number = 8").fetchone()
assert row is not None
assert row["title"] == "HUMBLE."
assert row["disc_number"] == 1
assert row["file_path"] == result["final_path"]
assert row["deezer_id"] == "350171311"
assert sync_calls and sync_calls[0][1] == "navidrome"
def test_import_adds_disc_number_column_for_older_track_tables(tmp_path, monkeypatch):
db, conn = _make_db(include_disc_number=False)
source_file = tmp_path / "deluxe" / "08 - HUMBLE.flac"
source_file.parent.mkdir()
source_file.write_bytes(b"source audio")
sibling_file = tmp_path / "basic" / "01 - BLOOD.flac"
sibling_file.parent.mkdir()
sibling_file.write_bytes(b"sibling audio")
_insert_track(conn, track_id="basic-blood", album_id="album-basic", title="BLOOD.", track_number=1, file_path=sibling_file)
_insert_track(conn, track_id="deluxe-humble", album_id="album-deluxe", title="HUMBLE.", track_number=8, file_path=source_file)
write_calls = []
monkeypatch.setattr(mti, "read_album_identity_tags", lambda path: {"musicbrainz_albumid": "target-release"} if path == str(sibling_file) else {})
monkeypatch.setattr(mti, "write_album_identity_tags", lambda path, tags: write_calls.append((path, tags)) or True)
result = mti.import_existing_track_for_album_slot("album-basic", _payload(), _deps(tmp_path, db))
columns = [row[1] for row in conn.execute("PRAGMA table_info(tracks)").fetchall()]
assert "disc_number" in columns
row = conn.execute("SELECT title, disc_number, file_path FROM tracks WHERE album_id = 'album-basic' AND track_number = 8").fetchone()
assert row["title"] == "HUMBLE."
assert row["disc_number"] == 1
assert row["file_path"] == result["final_path"]
assert write_calls, "album identity inheritance should still run after old DB migration"
def test_copy_album_identity_uses_target_sibling_and_leaves_track_tags_to_imported_file(tmp_path, monkeypatch):
db, conn = _make_db(include_disc_number=True)
sibling_file = tmp_path / "basic" / "01 - BLOOD.flac"
sibling_file.parent.mkdir()
sibling_file.write_bytes(b"sibling")
final_file = tmp_path / "basic" / "08 - HUMBLE.flac"
final_file.write_bytes(b"imported")
_insert_track(conn, track_id="basic-blood", album_id="album-basic", title="BLOOD.", track_number=1, file_path=sibling_file)
monkeypatch.setattr(mti, "read_album_identity_tags", lambda path: {"musicbrainz_albumid": "target-release", "barcode": "target-barcode"})
writes = []
monkeypatch.setattr(mti, "write_album_identity_tags", lambda path, tags: writes.append((path, tags)) or True)
copied = mti.copy_album_identity_from_target_sibling(
db,
"album-basic",
str(final_file),
1,
8,
lambda path: str(path) if os.path.exists(path) else None,
)
assert copied is True
assert writes == [(str(final_file), {"musicbrainz_albumid": "target-release", "barcode": "target-barcode"})]
def test_import_rejects_missing_expected_track_context(tmp_path):
db, _conn = _make_db(include_disc_number=True)
with pytest.raises(mti.MissingTrackImportError) as exc:
mti.import_existing_track_for_album_slot("album-basic", {"source_track_id": "x", "expected_track": {}}, _deps(tmp_path, db))
assert exc.value.status_code == 400
assert "expected_track" in str(exc.value)