mirror of https://github.com/Nezreka/SoulSync.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
261 lines
9.8 KiB
261 lines
9.8 KiB
"""Tests for the Enhanced Library "I Have This" import service."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import shutil
|
|
import sqlite3
|
|
from dataclasses import dataclass
|
|
|
|
import pytest
|
|
|
|
from core.library import missing_track_import as mti
|
|
|
|
|
|
class _ConnCtx:
|
|
def __init__(self, conn):
|
|
self.conn = conn
|
|
|
|
def __enter__(self):
|
|
return self.conn
|
|
|
|
def __exit__(self, exc_type, exc, tb):
|
|
return False
|
|
|
|
|
|
class _FakeDB:
|
|
def __init__(self, conn):
|
|
self.conn = conn
|
|
|
|
def _get_connection(self):
|
|
return _ConnCtx(self.conn)
|
|
|
|
|
|
@dataclass
|
|
class _FakeConfig:
|
|
download_path: str
|
|
active_server: str = "navidrome"
|
|
|
|
def get(self, key, default=None):
|
|
if key == "soulseek.download_path":
|
|
return self.download_path
|
|
return default
|
|
|
|
def get_active_media_server(self):
|
|
return self.active_server
|
|
|
|
|
|
def _make_db(*, include_disc_number: bool = True) -> tuple[_FakeDB, sqlite3.Connection]:
|
|
conn = sqlite3.connect(":memory:")
|
|
conn.row_factory = sqlite3.Row
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"""
|
|
CREATE TABLE artists (
|
|
id TEXT PRIMARY KEY,
|
|
name TEXT NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
cur.execute(
|
|
"""
|
|
CREATE TABLE albums (
|
|
id TEXT PRIMARY KEY,
|
|
artist_id TEXT NOT NULL,
|
|
title TEXT NOT NULL,
|
|
year INTEGER,
|
|
track_count INTEGER,
|
|
server_source TEXT,
|
|
deezer_id TEXT,
|
|
thumb_url TEXT
|
|
)
|
|
"""
|
|
)
|
|
disc_col = ", disc_number INTEGER DEFAULT 1" if include_disc_number else ""
|
|
cur.execute(
|
|
f"""
|
|
CREATE TABLE tracks (
|
|
id TEXT PRIMARY KEY,
|
|
album_id TEXT NOT NULL,
|
|
artist_id TEXT NOT NULL,
|
|
title TEXT NOT NULL,
|
|
track_number INTEGER{disc_col},
|
|
duration INTEGER,
|
|
file_path TEXT,
|
|
bitrate INTEGER,
|
|
file_size INTEGER,
|
|
server_source TEXT,
|
|
deezer_id TEXT,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
"""
|
|
)
|
|
cur.execute("INSERT INTO artists (id, name) VALUES ('artist-1', 'Kendrick Lamar')")
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO albums (id, artist_id, title, year, track_count, server_source, deezer_id)
|
|
VALUES ('album-basic', 'artist-1', 'DAMN.', 2017, 14, 'navidrome', '302127')
|
|
"""
|
|
)
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO albums (id, artist_id, title, year, track_count, server_source, deezer_id)
|
|
VALUES ('album-deluxe', 'artist-1', 'DAMN. COLLECTORS EDITION', 2017, 14, 'navidrome', '999999')
|
|
"""
|
|
)
|
|
conn.commit()
|
|
return _FakeDB(conn), conn
|
|
|
|
|
|
def _insert_track(conn, *, track_id, album_id, title, track_number, file_path, disc_number=1):
|
|
columns = [row[1] for row in conn.execute("PRAGMA table_info(tracks)").fetchall()]
|
|
if "disc_number" in columns:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO tracks (id, album_id, artist_id, title, track_number, disc_number, duration, file_path, bitrate, file_size, server_source)
|
|
VALUES (?, ?, 'artist-1', ?, ?, ?, 177000, ?, 900, 1234, 'navidrome')
|
|
""",
|
|
(track_id, album_id, title, track_number, disc_number, str(file_path)),
|
|
)
|
|
else:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO tracks (id, album_id, artist_id, title, track_number, duration, file_path, bitrate, file_size, server_source)
|
|
VALUES (?, ?, 'artist-1', ?, ?, 177000, ?, 900, 1234, 'navidrome')
|
|
""",
|
|
(track_id, album_id, title, track_number, str(file_path)),
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
def _deps(tmp_path, db, *, post_process_fn=None, sync_calls=None):
|
|
sync_calls = sync_calls if sync_calls is not None else []
|
|
|
|
def _default_post_process(_key, context, staged_path):
|
|
final_dir = tmp_path / "Library" / "Kendrick Lamar - 2017 DAMN"
|
|
final_dir.mkdir(parents=True, exist_ok=True)
|
|
final_path = final_dir / "08 - HUMBLE [FLAC 16bit].flac"
|
|
shutil.copy2(staged_path, final_path)
|
|
context["_final_processed_path"] = str(final_path)
|
|
|
|
return mti.MissingTrackImportDeps(
|
|
database=db,
|
|
config_manager=_FakeConfig(str(tmp_path / "downloads")),
|
|
post_process_fn=post_process_fn or _default_post_process,
|
|
resolve_library_file_path_fn=lambda path: str(path) if path and os.path.exists(path) else None,
|
|
docker_resolve_path_fn=lambda path: path,
|
|
sync_tracks_to_server_fn=lambda rows, server: sync_calls.append((rows, server)),
|
|
service_id_columns={"deezer": {"track": "deezer_id"}},
|
|
)
|
|
|
|
|
|
def _payload():
|
|
return {
|
|
"source_track_id": "deluxe-humble",
|
|
"album_source_id": "302127",
|
|
"total_discs": 1,
|
|
"expected_track": {
|
|
"title": "HUMBLE.",
|
|
"track_number": 8,
|
|
"disc_number": 1,
|
|
"duration": 177000,
|
|
"source": "deezer",
|
|
"track_id": "350171311",
|
|
"deezer_id": "350171311",
|
|
"artists": ["Kendrick Lamar"],
|
|
},
|
|
}
|
|
|
|
|
|
def test_import_existing_track_copies_file_and_writes_target_album_row(tmp_path, monkeypatch):
|
|
db, conn = _make_db(include_disc_number=True)
|
|
source_file = tmp_path / "deluxe" / "08 - HUMBLE.flac"
|
|
source_file.parent.mkdir()
|
|
source_file.write_bytes(b"source audio")
|
|
sibling_file = tmp_path / "basic" / "01 - BLOOD.flac"
|
|
sibling_file.parent.mkdir()
|
|
sibling_file.write_bytes(b"sibling audio")
|
|
_insert_track(conn, track_id="basic-blood", album_id="album-basic", title="BLOOD.", track_number=1, file_path=sibling_file)
|
|
_insert_track(conn, track_id="deluxe-humble", album_id="album-deluxe", title="HUMBLE.", track_number=8, file_path=source_file)
|
|
|
|
inherited = []
|
|
monkeypatch.setattr(mti, "read_album_identity_tags", lambda path: {"musicbrainz_albumid": "target-release"} if path == str(sibling_file) else {})
|
|
monkeypatch.setattr(mti, "write_album_identity_tags", lambda path, tags: inherited.append((path, tags)) or True)
|
|
|
|
sync_calls = []
|
|
result = mti.import_existing_track_for_album_slot("album-basic", _payload(), _deps(tmp_path, db, sync_calls=sync_calls))
|
|
|
|
assert source_file.read_bytes() == b"source audio"
|
|
assert os.path.exists(result["final_path"])
|
|
assert inherited == [(result["final_path"], {"musicbrainz_albumid": "target-release"})]
|
|
|
|
row = conn.execute("SELECT * FROM tracks WHERE album_id = 'album-basic' AND track_number = 8").fetchone()
|
|
assert row is not None
|
|
assert row["title"] == "HUMBLE."
|
|
assert row["disc_number"] == 1
|
|
assert row["file_path"] == result["final_path"]
|
|
assert row["deezer_id"] == "350171311"
|
|
assert sync_calls and sync_calls[0][1] == "navidrome"
|
|
|
|
|
|
def test_import_adds_disc_number_column_for_older_track_tables(tmp_path, monkeypatch):
|
|
db, conn = _make_db(include_disc_number=False)
|
|
source_file = tmp_path / "deluxe" / "08 - HUMBLE.flac"
|
|
source_file.parent.mkdir()
|
|
source_file.write_bytes(b"source audio")
|
|
sibling_file = tmp_path / "basic" / "01 - BLOOD.flac"
|
|
sibling_file.parent.mkdir()
|
|
sibling_file.write_bytes(b"sibling audio")
|
|
_insert_track(conn, track_id="basic-blood", album_id="album-basic", title="BLOOD.", track_number=1, file_path=sibling_file)
|
|
_insert_track(conn, track_id="deluxe-humble", album_id="album-deluxe", title="HUMBLE.", track_number=8, file_path=source_file)
|
|
|
|
write_calls = []
|
|
monkeypatch.setattr(mti, "read_album_identity_tags", lambda path: {"musicbrainz_albumid": "target-release"} if path == str(sibling_file) else {})
|
|
monkeypatch.setattr(mti, "write_album_identity_tags", lambda path, tags: write_calls.append((path, tags)) or True)
|
|
|
|
result = mti.import_existing_track_for_album_slot("album-basic", _payload(), _deps(tmp_path, db))
|
|
|
|
columns = [row[1] for row in conn.execute("PRAGMA table_info(tracks)").fetchall()]
|
|
assert "disc_number" in columns
|
|
row = conn.execute("SELECT title, disc_number, file_path FROM tracks WHERE album_id = 'album-basic' AND track_number = 8").fetchone()
|
|
assert row["title"] == "HUMBLE."
|
|
assert row["disc_number"] == 1
|
|
assert row["file_path"] == result["final_path"]
|
|
assert write_calls, "album identity inheritance should still run after old DB migration"
|
|
|
|
|
|
def test_copy_album_identity_uses_target_sibling_and_leaves_track_tags_to_imported_file(tmp_path, monkeypatch):
|
|
db, conn = _make_db(include_disc_number=True)
|
|
sibling_file = tmp_path / "basic" / "01 - BLOOD.flac"
|
|
sibling_file.parent.mkdir()
|
|
sibling_file.write_bytes(b"sibling")
|
|
final_file = tmp_path / "basic" / "08 - HUMBLE.flac"
|
|
final_file.write_bytes(b"imported")
|
|
_insert_track(conn, track_id="basic-blood", album_id="album-basic", title="BLOOD.", track_number=1, file_path=sibling_file)
|
|
|
|
monkeypatch.setattr(mti, "read_album_identity_tags", lambda path: {"musicbrainz_albumid": "target-release", "barcode": "target-barcode"})
|
|
writes = []
|
|
monkeypatch.setattr(mti, "write_album_identity_tags", lambda path, tags: writes.append((path, tags)) or True)
|
|
|
|
copied = mti.copy_album_identity_from_target_sibling(
|
|
db,
|
|
"album-basic",
|
|
str(final_file),
|
|
1,
|
|
8,
|
|
lambda path: str(path) if os.path.exists(path) else None,
|
|
)
|
|
|
|
assert copied is True
|
|
assert writes == [(str(final_file), {"musicbrainz_albumid": "target-release", "barcode": "target-barcode"})]
|
|
|
|
|
|
def test_import_rejects_missing_expected_track_context(tmp_path):
|
|
db, _conn = _make_db(include_disc_number=True)
|
|
with pytest.raises(mti.MissingTrackImportError) as exc:
|
|
mti.import_existing_track_for_album_slot("album-basic", {"source_track_id": "x", "expected_track": {}}, _deps(tmp_path, db))
|
|
|
|
assert exc.value.status_code == 400
|
|
assert "expected_track" in str(exc.value)
|