mirror of https://github.com/Nezreka/SoulSync.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
848 lines
31 KiB
848 lines
31 KiB
"""Unit tests for core/amazon_download_client.py.
|
|
|
|
All network I/O and subprocess calls are mocked.
|
|
No real T2Tunes instance or ffmpeg binary required.
|
|
|
|
Run from project root:
|
|
python -m pytest tests/tools/test_amazon_download_client.py -v
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import io
|
|
import sys
|
|
import tempfile
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Iterator, List, Optional
|
|
from unittest.mock import AsyncMock, MagicMock, call, patch
|
|
|
|
import pytest
|
|
|
|
ROOT = Path(__file__).resolve().parents[2]
|
|
if str(ROOT) not in sys.path:
|
|
sys.path.insert(0, str(ROOT))
|
|
|
|
from core.amazon_client import AmazonClientError, T2TunesStreamInfo
|
|
from core.amazon_download_client import (
|
|
AmazonDownloadClient,
|
|
MIN_AUDIO_BYTES,
|
|
_codec_key,
|
|
_file_extension,
|
|
_quality_label,
|
|
)
|
|
from core.download_plugins.types import AlbumResult, DownloadStatus, TrackResult
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers / fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def run(coro):
|
|
return asyncio.run(coro)
|
|
|
|
|
|
def _stream_info(
|
|
*,
|
|
asin: str = "B09XYZ1234",
|
|
streamable: bool = True,
|
|
codec: str = "FLAC",
|
|
sample_rate: int = 44100,
|
|
stream_url: str = "https://cdn.example.com/track.enc.flac",
|
|
decryption_key: Optional[str] = "deadbeef1234",
|
|
) -> T2TunesStreamInfo:
|
|
return T2TunesStreamInfo(
|
|
asin=asin,
|
|
streamable=streamable,
|
|
codec=codec,
|
|
format=codec,
|
|
sample_rate=sample_rate,
|
|
stream_url=stream_url,
|
|
decryption_key=decryption_key,
|
|
title="Not Like Us",
|
|
artist="Kendrick Lamar",
|
|
album="GNX",
|
|
isrc="USRC12345678",
|
|
)
|
|
|
|
|
|
def _search_items(n_tracks: int = 2, n_albums: int = 1):
|
|
from core.amazon_client import T2TunesSearchItem
|
|
items = [
|
|
T2TunesSearchItem(
|
|
asin=f"B0TRACK{i}",
|
|
title=f"Track {i}",
|
|
artist_name="Kendrick Lamar",
|
|
item_type="MusicTrack",
|
|
album_name="GNX",
|
|
album_asin="B0ALBUM1",
|
|
duration_seconds=200 + i * 10,
|
|
isrc=f"USRC{i:08d}",
|
|
)
|
|
for i in range(n_tracks)
|
|
]
|
|
items += [
|
|
T2TunesSearchItem(
|
|
asin=f"B0ALBUM{j}",
|
|
title=f"Album {j}",
|
|
artist_name="Kendrick Lamar",
|
|
item_type="MusicAlbum",
|
|
album_name=f"Album {j}",
|
|
album_asin=f"B0ALBUM{j}",
|
|
duration_seconds=0,
|
|
)
|
|
for j in range(n_albums)
|
|
]
|
|
return items
|
|
|
|
|
|
def _make_client(tmp_path: Path) -> AmazonDownloadClient:
|
|
with patch("core.amazon_download_client.config_manager") as cfg:
|
|
cfg.get.return_value = str(tmp_path)
|
|
with patch("core.amazon_client.config_manager") as cfg2:
|
|
cfg2.get.return_value = ""
|
|
client = AmazonDownloadClient(download_path=str(tmp_path))
|
|
return client
|
|
|
|
|
|
def _fake_chunked_response(data: bytes, status_code: int = 200) -> MagicMock:
|
|
resp = MagicMock()
|
|
resp.status_code = status_code
|
|
resp.ok = status_code < 400
|
|
resp.headers = {"content-length": str(len(data))}
|
|
|
|
chunk_size = 4096
|
|
chunks = [data[i : i + chunk_size] for i in range(0, len(data), chunk_size)] or [b""]
|
|
|
|
def iter_content(chunk_size=None):
|
|
yield from chunks
|
|
|
|
resp.iter_content = iter_content
|
|
if status_code >= 400:
|
|
from requests import HTTPError
|
|
resp.raise_for_status.side_effect = HTTPError(response=resp)
|
|
else:
|
|
resp.raise_for_status.return_value = None
|
|
return resp
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Codec / quality helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestCodecHelpers:
|
|
def test_codec_key_lowercases(self):
|
|
assert _codec_key("FLAC") == "flac"
|
|
assert _codec_key("OGG-Vorbis") == "ogg_vorbis"
|
|
assert _codec_key("EAC3") == "eac3"
|
|
|
|
def test_file_extension_known_codecs(self):
|
|
assert _file_extension("FLAC") == "flac"
|
|
assert _file_extension("ogg_vorbis") == "ogg"
|
|
assert _file_extension("opus") == "opus"
|
|
assert _file_extension("eac3") == "eac3"
|
|
assert _file_extension("mp4") == "m4a"
|
|
assert _file_extension("aac") == "m4a"
|
|
assert _file_extension("mp3") == "mp3"
|
|
|
|
def test_file_extension_unknown_falls_back(self):
|
|
assert _file_extension("wtf_codec") == "bin"
|
|
|
|
def test_quality_label_flac_lossless(self):
|
|
assert _quality_label("flac", 44100) == "Lossless"
|
|
assert _quality_label("FLAC", 48000) == "Lossless"
|
|
|
|
def test_quality_label_flac_hires(self):
|
|
assert _quality_label("flac", 96000) == "Hi-Res"
|
|
assert _quality_label("flac", 192000) == "Hi-Res"
|
|
|
|
def test_quality_label_lossy(self):
|
|
assert _quality_label("opus") == "Lossy"
|
|
assert _quality_label("eac3") == "Lossy"
|
|
assert _quality_label("mp3") == "Lossy"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# is_configured / check_connection
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestIsConfigured:
|
|
def test_always_true(self, tmp_path):
|
|
client = _make_client(tmp_path)
|
|
assert client.is_configured() is True
|
|
|
|
|
|
class TestCheckConnection:
|
|
def test_true_when_client_authenticated(self, tmp_path):
|
|
client = _make_client(tmp_path)
|
|
client._client = MagicMock()
|
|
client._client.is_authenticated.return_value = True
|
|
assert run(client.check_connection()) is True
|
|
|
|
def test_false_when_client_down(self, tmp_path):
|
|
client = _make_client(tmp_path)
|
|
client._client = MagicMock()
|
|
client._client.is_authenticated.return_value = False
|
|
assert run(client.check_connection()) is False
|
|
|
|
def test_false_on_exception(self, tmp_path):
|
|
client = _make_client(tmp_path)
|
|
client._client = MagicMock()
|
|
client._client.is_authenticated.side_effect = Exception("timeout")
|
|
assert run(client.check_connection()) is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# search()
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestSearch:
|
|
def test_returns_track_results(self, tmp_path):
|
|
client = _make_client(tmp_path)
|
|
client._client = MagicMock()
|
|
client._client.search_raw.return_value = _search_items(n_tracks=2, n_albums=0)
|
|
client._client.preferred_codec = "flac"
|
|
|
|
tracks, albums = run(client.search("Kendrick Lamar"))
|
|
|
|
assert len(tracks) == 2
|
|
assert len(albums) == 0
|
|
assert all(isinstance(t, TrackResult) for t in tracks)
|
|
|
|
def test_track_fields(self, tmp_path):
|
|
client = _make_client(tmp_path)
|
|
client._client = MagicMock()
|
|
client._client.search_raw.return_value = _search_items(n_tracks=1, n_albums=0)
|
|
client._client.preferred_codec = "flac"
|
|
|
|
tracks, _ = run(client.search("Not Like Us"))
|
|
t = tracks[0]
|
|
|
|
assert t.username == "amazon"
|
|
assert "B0TRACK0" in t.filename
|
|
assert "||" in t.filename
|
|
assert t.artist == "Kendrick Lamar"
|
|
assert t.title == "Track 0"
|
|
assert t.album == "GNX"
|
|
assert t.quality == "Lossless"
|
|
assert t.duration == 200_000
|
|
|
|
def test_track_source_metadata(self, tmp_path):
|
|
client = _make_client(tmp_path)
|
|
client._client = MagicMock()
|
|
client._client.search_raw.return_value = _search_items(n_tracks=1, n_albums=0)
|
|
client._client.preferred_codec = "flac"
|
|
|
|
tracks, _ = run(client.search("test"))
|
|
meta = tracks[0]._source_metadata
|
|
|
|
assert meta["asin"] == "B0TRACK0"
|
|
assert meta["album_asin"] == "B0ALBUM1"
|
|
assert "isrc" in meta
|
|
|
|
def test_returns_album_results(self, tmp_path):
|
|
client = _make_client(tmp_path)
|
|
client._client = MagicMock()
|
|
client._client.search_raw.return_value = _search_items(n_tracks=0, n_albums=2)
|
|
client._client.preferred_codec = "flac"
|
|
|
|
_, albums = run(client.search("GNX"))
|
|
|
|
assert len(albums) == 2
|
|
assert all(isinstance(a, AlbumResult) for a in albums)
|
|
|
|
def test_album_deduplication(self, tmp_path):
|
|
from core.amazon_client import T2TunesSearchItem
|
|
client = _make_client(tmp_path)
|
|
client._client = MagicMock()
|
|
client._client.preferred_codec = "flac"
|
|
# Two hits with same album_asin
|
|
dup = T2TunesSearchItem(
|
|
asin="B0ALBUM0",
|
|
title="GNX",
|
|
artist_name="Kendrick Lamar",
|
|
item_type="MusicAlbum",
|
|
album_asin="B0ALBUM0",
|
|
)
|
|
client._client.search_raw.return_value = [dup, dup]
|
|
|
|
_, albums = run(client.search("GNX"))
|
|
assert len(albums) == 1
|
|
|
|
def test_returns_empty_on_error(self, tmp_path):
|
|
client = _make_client(tmp_path)
|
|
client._client = MagicMock()
|
|
client._client.search_raw.side_effect = AmazonClientError("fail")
|
|
|
|
tracks, albums = run(client.search("anything"))
|
|
assert tracks == []
|
|
assert albums == []
|
|
|
|
def test_soulseek_compat_fields(self, tmp_path):
|
|
client = _make_client(tmp_path)
|
|
client._client = MagicMock()
|
|
client._client.search_raw.return_value = _search_items(n_tracks=1, n_albums=0)
|
|
client._client.preferred_codec = "flac"
|
|
|
|
tracks, _ = run(client.search("test"))
|
|
t = tracks[0]
|
|
|
|
assert t.free_upload_slots == 999
|
|
assert t.upload_speed == 999_999
|
|
assert t.queue_length == 0
|
|
assert t.size == 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _unique_path
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestUniquePath:
|
|
def test_returns_original_when_no_conflict(self, tmp_path):
|
|
p = tmp_path / "track.flac"
|
|
result = AmazonDownloadClient._unique_path(p)
|
|
assert result == p
|
|
|
|
def test_appends_counter_on_conflict(self, tmp_path):
|
|
p = tmp_path / "track.flac"
|
|
p.touch()
|
|
result = AmazonDownloadClient._unique_path(p)
|
|
assert result != p
|
|
assert "(1)" in result.name
|
|
|
|
def test_increments_counter(self, tmp_path):
|
|
p = tmp_path / "track.flac"
|
|
p.touch()
|
|
(tmp_path / "track (1).flac").touch()
|
|
result = AmazonDownloadClient._unique_path(p)
|
|
assert "(2)" in result.name
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _record_to_status
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestRecordToStatus:
|
|
def test_fields_mapped(self):
|
|
rec = {
|
|
"id": "dl-001",
|
|
"filename": "B1||Artist - Title",
|
|
"state": "downloading",
|
|
"progress": 0.5,
|
|
"size": 10_000_000,
|
|
"transferred": 5_000_000,
|
|
"speed": 1_000_000,
|
|
"time_remaining": 5,
|
|
"file_path": "/tmp/track.flac",
|
|
}
|
|
status = AmazonDownloadClient._record_to_status(rec)
|
|
|
|
assert status.id == "dl-001"
|
|
assert status.filename == "B1||Artist - Title"
|
|
assert status.username == "amazon"
|
|
assert status.state == "downloading"
|
|
assert status.progress == 0.5
|
|
assert status.size == 10_000_000
|
|
assert status.transferred == 5_000_000
|
|
assert status.speed == 1_000_000
|
|
assert status.time_remaining == 5
|
|
assert status.file_path == "/tmp/track.flac"
|
|
|
|
def test_defaults_for_missing_fields(self):
|
|
status = AmazonDownloadClient._record_to_status({})
|
|
assert status.id == ""
|
|
assert status.state == "queued"
|
|
assert status.progress == 0.0
|
|
assert status.size == 0
|
|
assert status.transferred == 0
|
|
assert status.speed == 0
|
|
assert status.time_remaining is None
|
|
assert status.file_path is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _stream_to_file
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestStreamToFile:
|
|
def test_writes_file_and_returns_size(self, tmp_path):
|
|
client = _make_client(tmp_path)
|
|
data = b"X" * (MIN_AUDIO_BYTES + 1024)
|
|
client.session = MagicMock()
|
|
client.session.get.return_value = _fake_chunked_response(data)
|
|
|
|
out = tmp_path / "output.flac"
|
|
downloaded = client._stream_to_file("https://example.com/t.flac", out, "dl-001")
|
|
|
|
assert downloaded == len(data)
|
|
assert out.exists()
|
|
assert out.read_bytes() == data
|
|
|
|
def test_raises_on_http_error(self, tmp_path):
|
|
from requests import HTTPError
|
|
client = _make_client(tmp_path)
|
|
client.session = MagicMock()
|
|
client.session.get.return_value = _fake_chunked_response(b"", status_code=403)
|
|
|
|
out = tmp_path / "output.flac"
|
|
with pytest.raises(HTTPError):
|
|
client._stream_to_file("https://example.com/t.flac", out, "dl-001")
|
|
|
|
def test_respects_shutdown_check(self, tmp_path):
|
|
client = _make_client(tmp_path)
|
|
data = b"X" * (MIN_AUDIO_BYTES + 1024)
|
|
client.session = MagicMock()
|
|
client.session.get.return_value = _fake_chunked_response(data)
|
|
client.shutdown_check = lambda: True # trigger immediately
|
|
|
|
out = tmp_path / "output.flac"
|
|
with pytest.raises(RuntimeError, match="Shutdown"):
|
|
client._stream_to_file("https://example.com/t.flac", out, "dl-001")
|
|
assert not out.exists()
|
|
|
|
def test_updates_engine_progress(self, tmp_path):
|
|
import itertools
|
|
client = _make_client(tmp_path)
|
|
data = b"X" * (MIN_AUDIO_BYTES + 1024)
|
|
client.session = MagicMock()
|
|
client.session.get.return_value = _fake_chunked_response(data)
|
|
engine = MagicMock()
|
|
client._engine = engine
|
|
|
|
out = tmp_path / "output.flac"
|
|
counter = itertools.count(0.0, 1.0)
|
|
with patch("core.amazon_download_client.time") as mock_time:
|
|
mock_time.monotonic.side_effect = lambda: next(counter)
|
|
client._stream_to_file("https://example.com/t.flac", out, "dl-001")
|
|
|
|
assert engine.update_record.called
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _decrypt_with_ffmpeg
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestDecryptWithFfmpeg:
|
|
def test_calls_ffmpeg_with_key(self, tmp_path):
|
|
client = _make_client(tmp_path)
|
|
enc = tmp_path / "track.enc.flac"
|
|
enc.write_bytes(b"encrypted")
|
|
out = tmp_path / "track.flac"
|
|
|
|
with patch("shutil.which", return_value="/usr/bin/ffmpeg"):
|
|
with patch("subprocess.run") as mock_run:
|
|
mock_run.return_value = MagicMock(returncode=0, stderr=b"")
|
|
client._decrypt_with_ffmpeg(enc, out, "deadbeef1234")
|
|
|
|
cmd = mock_run.call_args[0][0]
|
|
assert "ffmpeg" in cmd[0]
|
|
assert "-decryption_key" in cmd
|
|
assert "deadbeef1234" in cmd
|
|
assert str(enc) in cmd
|
|
assert str(out) in cmd
|
|
|
|
def test_raises_on_ffmpeg_failure(self, tmp_path):
|
|
client = _make_client(tmp_path)
|
|
enc = tmp_path / "track.enc.flac"
|
|
enc.write_bytes(b"bad")
|
|
out = tmp_path / "track.flac"
|
|
|
|
with patch("shutil.which", return_value="/usr/bin/ffmpeg"):
|
|
with patch("subprocess.run") as mock_run:
|
|
mock_run.return_value = MagicMock(
|
|
returncode=1, stderr=b"Invalid data found"
|
|
)
|
|
with pytest.raises(RuntimeError, match="FFmpeg decryption failed"):
|
|
client._decrypt_with_ffmpeg(enc, out, "deadbeef1234")
|
|
|
|
def test_raises_when_ffmpeg_missing(self, tmp_path):
|
|
client = _make_client(tmp_path)
|
|
enc = tmp_path / "track.enc.flac"
|
|
out = tmp_path / "track.flac"
|
|
|
|
with patch("shutil.which", return_value=None):
|
|
# Ensure tools/ffmpeg.exe also absent
|
|
with pytest.raises(RuntimeError, match="ffmpeg is required"):
|
|
client._decrypt_with_ffmpeg(enc, out, "deadbeef1234")
|
|
|
|
def test_uses_tools_ffmpeg_when_not_on_path(self, tmp_path):
|
|
client = _make_client(tmp_path)
|
|
enc = tmp_path / "track.enc.flac"
|
|
enc.write_bytes(b"enc")
|
|
out = tmp_path / "track.flac"
|
|
|
|
fake_ffmpeg = tmp_path / "ffmpeg.exe"
|
|
fake_ffmpeg.touch()
|
|
|
|
with patch("shutil.which", return_value=None):
|
|
with patch(
|
|
"core.amazon_download_client.Path.__file__",
|
|
create=True,
|
|
):
|
|
import os as _os
|
|
is_nt = _os.name == "nt"
|
|
ffmpeg_name = "ffmpeg.exe" if is_nt else "ffmpeg"
|
|
tools_dir = ROOT / "tools"
|
|
tools_ffmpeg = tools_dir / ffmpeg_name
|
|
if tools_ffmpeg.exists():
|
|
with patch("subprocess.run") as mock_run:
|
|
mock_run.return_value = MagicMock(returncode=0, stderr=b"")
|
|
client._decrypt_with_ffmpeg(enc, out, "aabbcc")
|
|
assert str(tools_ffmpeg) in mock_run.call_args[0][0][0]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _download_sync — integration of stream + decrypt
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestDownloadSync:
|
|
def _setup(self, tmp_path: Path, decryption_key: Optional[str] = "deadbeef"):
|
|
client = _make_client(tmp_path)
|
|
stream = _stream_info(decryption_key=decryption_key)
|
|
client._client = MagicMock()
|
|
client._client.media_from_asin.return_value = [stream]
|
|
client._client.preferred_codec = "flac"
|
|
|
|
audio_data = b"A" * (MIN_AUDIO_BYTES + 1024)
|
|
client.session = MagicMock()
|
|
client.session.get.return_value = _fake_chunked_response(audio_data)
|
|
return client, audio_data
|
|
|
|
def test_returns_output_path_on_success(self, tmp_path):
|
|
client, audio_data = self._setup(tmp_path)
|
|
|
|
with patch("shutil.which", return_value="/usr/bin/ffmpeg"):
|
|
with patch("subprocess.run") as mock_run:
|
|
mock_run.return_value = MagicMock(returncode=0, stderr=b"")
|
|
# simulate ffmpeg writing output file
|
|
def _ffmpeg_side_effect(cmd, capture_output=False):
|
|
# Write dummy decrypted data
|
|
out_path = Path(cmd[-1])
|
|
out_path.write_bytes(audio_data)
|
|
return MagicMock(returncode=0, stderr=b"")
|
|
|
|
mock_run.side_effect = _ffmpeg_side_effect
|
|
result = client._download_sync("dl-001", "B09XYZ1234", "Kendrick Lamar - Not Like Us")
|
|
|
|
assert result is not None
|
|
assert Path(result).exists()
|
|
assert Path(result).suffix == ".flac"
|
|
|
|
def test_tries_next_codec_when_stream_unavailable(self, tmp_path):
|
|
client = _make_client(tmp_path)
|
|
# FLAC: no streamable results; Opus: success
|
|
flac_stream = _stream_info(streamable=False, codec="FLAC")
|
|
opus_stream = _stream_info(streamable=True, codec="OPUS",
|
|
stream_url="https://cdn.example.com/t.opus",
|
|
decryption_key=None)
|
|
client._client = MagicMock()
|
|
client._client.preferred_codec = "flac"
|
|
|
|
def _media(asin, codec):
|
|
if codec == "flac":
|
|
return [flac_stream]
|
|
if codec == "opus":
|
|
return [opus_stream]
|
|
return []
|
|
|
|
client._client.media_from_asin.side_effect = _media
|
|
|
|
audio_data = b"B" * (MIN_AUDIO_BYTES + 1024)
|
|
client.session = MagicMock()
|
|
client.session.get.return_value = _fake_chunked_response(audio_data)
|
|
|
|
result = client._download_sync("dl-001", "B09XYZ1234", "Kendrick - Track")
|
|
|
|
assert result is not None
|
|
assert ".opus" in result
|
|
|
|
def test_returns_none_when_file_too_small(self, tmp_path):
|
|
client = _make_client(tmp_path)
|
|
client._client = MagicMock()
|
|
client._client.preferred_codec = "flac"
|
|
client._client.media_from_asin.return_value = [
|
|
_stream_info(decryption_key=None)
|
|
]
|
|
tiny_data = b"X" * 100 # way below MIN_AUDIO_BYTES
|
|
client.session = MagicMock()
|
|
client.session.get.return_value = _fake_chunked_response(tiny_data)
|
|
|
|
result = client._download_sync("dl-001", "B09XYZ1234", "Artist - Title")
|
|
assert result is None
|
|
|
|
def test_tries_next_codec_when_media_fails(self, tmp_path):
|
|
client = _make_client(tmp_path)
|
|
client._client = MagicMock()
|
|
client._client.preferred_codec = "flac"
|
|
|
|
call_count = {"n": 0}
|
|
|
|
def _media(asin, codec):
|
|
call_count["n"] += 1
|
|
if call_count["n"] == 1:
|
|
raise AmazonClientError("quota exceeded")
|
|
stream = _stream_info(codec="OPUS", decryption_key=None)
|
|
return [stream]
|
|
|
|
client._client.media_from_asin.side_effect = _media
|
|
|
|
audio_data = b"C" * (MIN_AUDIO_BYTES + 1024)
|
|
client.session = MagicMock()
|
|
client.session.get.return_value = _fake_chunked_response(audio_data)
|
|
|
|
result = client._download_sync("dl-001", "B09XYZ1234", "Artist - Track")
|
|
assert result is not None
|
|
|
|
def test_decryption_failure_tries_next_codec(self, tmp_path):
|
|
client, audio_data = self._setup(tmp_path, decryption_key="badkey")
|
|
|
|
with patch("shutil.which", return_value="/usr/bin/ffmpeg"):
|
|
with patch("subprocess.run") as mock_run:
|
|
# First codec (flac) decryption fails; no more codecs succeed
|
|
mock_run.return_value = MagicMock(returncode=1, stderr=b"decrypt error")
|
|
result = client._download_sync("dl-001", "B09XYZ1234", "Artist - Track")
|
|
|
|
# All codecs should fail since we return the same bad result for all
|
|
assert result is None
|
|
|
|
def test_updates_engine_state(self, tmp_path):
|
|
client, audio_data = self._setup(tmp_path, decryption_key=None)
|
|
engine = MagicMock()
|
|
client._engine = engine
|
|
|
|
result = client._download_sync("dl-001", "B09XYZ1234", "Artist - Track")
|
|
|
|
update_calls = engine.update_record.call_args_list
|
|
states = [c[0][2].get("state") for c in update_calls if "state" in c[0][2]]
|
|
assert "downloading" in states
|
|
|
|
def test_final_size_update_syncs_transferred_to_size(self, tmp_path):
|
|
"""size == transferred after success so monitor bytes-incomplete guard doesn't block."""
|
|
client, audio_data = self._setup(tmp_path, decryption_key=None)
|
|
engine = MagicMock()
|
|
client._engine = engine
|
|
|
|
result = client._download_sync("dl-001", "B09XYZ1234", "Artist - Track")
|
|
|
|
assert result is not None
|
|
# Last update must set size == transferred (both equal the output file size)
|
|
final_calls = [
|
|
c[0][2] for c in engine.update_record.call_args_list
|
|
if 'size' in c[0][2] and 'transferred' in c[0][2]
|
|
and c[0][2]['size'] == c[0][2]['transferred']
|
|
and c[0][2]['size'] > 0
|
|
]
|
|
assert final_calls, "Expected a final engine update with size == transferred > 0"
|
|
|
|
def test_clear_stream_skips_ffmpeg(self, tmp_path):
|
|
client, audio_data = self._setup(tmp_path, decryption_key=None)
|
|
|
|
with patch("subprocess.run") as mock_run:
|
|
result = client._download_sync("dl-001", "B09XYZ1234", "Artist - Track")
|
|
|
|
mock_run.assert_not_called()
|
|
assert result is not None
|
|
|
|
def test_safe_filename_sanitisation(self, tmp_path):
|
|
client, audio_data = self._setup(tmp_path, decryption_key=None)
|
|
result = client._download_sync(
|
|
"dl-001", "B09XYZ1234", "Björk / Sigur Rós: Hvarf<>Heim"
|
|
)
|
|
assert result is not None
|
|
# Path must not contain illegal filesystem chars
|
|
path = Path(result)
|
|
assert "/" not in path.name
|
|
assert "<" not in path.name
|
|
assert ">" not in path.name
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# download() — async dispatch
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestDownloadDispatch:
|
|
def test_raises_without_engine(self, tmp_path):
|
|
client = _make_client(tmp_path)
|
|
with pytest.raises(RuntimeError, match="_engine"):
|
|
run(client.download("amazon", "B09XYZ1234||Artist - Title"))
|
|
|
|
def test_returns_none_on_bad_filename(self, tmp_path):
|
|
client = _make_client(tmp_path)
|
|
client._engine = MagicMock()
|
|
result = run(client.download("amazon", "no-pipe-delimiter-here"))
|
|
assert result is None
|
|
|
|
def test_dispatches_to_engine_worker(self, tmp_path):
|
|
client = _make_client(tmp_path)
|
|
engine = MagicMock()
|
|
engine.worker.dispatch.return_value = "dl-abc123"
|
|
client._engine = engine
|
|
|
|
result = run(client.download("amazon", "B09XYZ1234||Kendrick Lamar - Not Like Us"))
|
|
|
|
assert result == "dl-abc123"
|
|
dispatch_call = engine.worker.dispatch.call_args
|
|
assert dispatch_call[1]["source_name"] == "amazon"
|
|
assert dispatch_call[1]["target_id"] == "B09XYZ1234"
|
|
assert dispatch_call[1]["display_name"] == "Kendrick Lamar - Not Like Us"
|
|
assert dispatch_call[1]["impl_callable"] == client._download_sync
|
|
|
|
def test_strips_whitespace_from_asin_and_name(self, tmp_path):
|
|
client = _make_client(tmp_path)
|
|
engine = MagicMock()
|
|
engine.worker.dispatch.return_value = "dl-xyz"
|
|
client._engine = engine
|
|
|
|
run(client.download("amazon", " B09XYZ1234 || Artist - Title "))
|
|
|
|
call = engine.worker.dispatch.call_args
|
|
assert call[1]["target_id"] == "B09XYZ1234"
|
|
assert call[1]["display_name"] == "Artist - Title"
|
|
|
|
def test_set_engine_wires_engine(self, tmp_path):
|
|
client = _make_client(tmp_path)
|
|
assert client._engine is None
|
|
engine = MagicMock()
|
|
client.set_engine(engine)
|
|
assert client._engine is engine
|
|
|
|
def test_set_shutdown_check_wires_callback(self, tmp_path):
|
|
client = _make_client(tmp_path)
|
|
assert client.shutdown_check is None
|
|
check = lambda: False
|
|
client.set_shutdown_check(check)
|
|
assert client.shutdown_check is check
|
|
|
|
def test_set_engine_allows_download_dispatch(self, tmp_path):
|
|
"""set_engine() must unblock download() — the live failure mode."""
|
|
client = _make_client(tmp_path)
|
|
engine = MagicMock()
|
|
engine.worker.dispatch.return_value = "dl-wired"
|
|
client.set_engine(engine)
|
|
result = run(client.download("amazon", "B09XYZ1234||Artist - Title"))
|
|
assert result == "dl-wired"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Status interface
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestStatusInterface:
|
|
def test_get_all_downloads_empty_without_engine(self, tmp_path):
|
|
client = _make_client(tmp_path)
|
|
result = run(client.get_all_downloads())
|
|
assert result == []
|
|
|
|
def test_get_all_downloads_converts_records(self, tmp_path):
|
|
client = _make_client(tmp_path)
|
|
engine = MagicMock()
|
|
engine.iter_records_for_source.return_value = iter([{
|
|
"id": "dl-001",
|
|
"filename": "B1||A - T",
|
|
"state": "complete",
|
|
"progress": 1.0,
|
|
"size": 5_000_000,
|
|
"transferred": 5_000_000,
|
|
"speed": 0,
|
|
}])
|
|
client._engine = engine
|
|
|
|
statuses = run(client.get_all_downloads())
|
|
assert len(statuses) == 1
|
|
assert statuses[0].id == "dl-001"
|
|
assert statuses[0].state == "complete"
|
|
engine.iter_records_for_source.assert_called_once_with('amazon')
|
|
|
|
def test_get_download_status_returns_none_without_engine(self, tmp_path):
|
|
client = _make_client(tmp_path)
|
|
assert run(client.get_download_status("dl-001")) is None
|
|
|
|
def test_get_download_status_hit(self, tmp_path):
|
|
client = _make_client(tmp_path)
|
|
engine = MagicMock()
|
|
engine.get_record.return_value = {
|
|
"id": "dl-001",
|
|
"filename": "B1||A - T",
|
|
"state": "downloading",
|
|
"progress": 0.7,
|
|
"size": 10_000_000,
|
|
"transferred": 7_000_000,
|
|
"speed": 500_000,
|
|
}
|
|
client._engine = engine
|
|
|
|
status = run(client.get_download_status("dl-001"))
|
|
assert status is not None
|
|
assert status.id == "dl-001"
|
|
assert status.state == "downloading"
|
|
assert status.progress == 0.7
|
|
engine.get_record.assert_called_once_with('amazon', 'dl-001')
|
|
|
|
def test_get_download_status_miss(self, tmp_path):
|
|
client = _make_client(tmp_path)
|
|
engine = MagicMock()
|
|
engine.get_record.return_value = None
|
|
client._engine = engine
|
|
|
|
assert run(client.get_download_status("nonexistent")) is None
|
|
|
|
def test_cancel_returns_false_without_engine(self, tmp_path):
|
|
client = _make_client(tmp_path)
|
|
assert run(client.cancel_download("dl-001")) is False
|
|
|
|
def test_cancel_delegates_to_engine(self, tmp_path):
|
|
client = _make_client(tmp_path)
|
|
engine = MagicMock()
|
|
engine.get_record.return_value = {'id': 'dl-001', 'state': 'downloading'}
|
|
client._engine = engine
|
|
|
|
result = run(client.cancel_download("dl-001", remove=True))
|
|
assert result is True
|
|
engine.update_record.assert_called_once_with('amazon', 'dl-001', {'state': 'Cancelled'})
|
|
engine.remove_record.assert_called_once_with('amazon', 'dl-001')
|
|
|
|
def test_cancel_returns_false_when_record_not_found(self, tmp_path):
|
|
client = _make_client(tmp_path)
|
|
engine = MagicMock()
|
|
engine.get_record.return_value = None
|
|
client._engine = engine
|
|
|
|
result = run(client.cancel_download("nonexistent"))
|
|
assert result is False
|
|
engine.update_record.assert_not_called()
|
|
|
|
def test_clear_completed_returns_true_without_engine(self, tmp_path):
|
|
client = _make_client(tmp_path)
|
|
assert run(client.clear_all_completed_downloads()) is True
|
|
|
|
def test_clear_completed_removes_terminal_records(self, tmp_path):
|
|
client = _make_client(tmp_path)
|
|
engine = MagicMock()
|
|
engine.iter_records_for_source.return_value = iter([
|
|
{'id': 'dl-001', 'state': 'Completed, Succeeded'},
|
|
{'id': 'dl-002', 'state': 'InProgress, Downloading'},
|
|
{'id': 'dl-003', 'state': 'Errored'},
|
|
])
|
|
client._engine = engine
|
|
|
|
result = run(client.clear_all_completed_downloads())
|
|
assert result is True
|
|
# Only terminal records removed
|
|
removed_ids = [c[0][1] for c in engine.remove_record.call_args_list]
|
|
assert 'dl-001' in removed_ids
|
|
assert 'dl-003' in removed_ids
|
|
assert 'dl-002' not in removed_ids
|
|
|
|
def test_status_methods_no_records(self, tmp_path):
|
|
"""Engine with no Amazon records returns empty/None gracefully."""
|
|
client = _make_client(tmp_path)
|
|
engine = MagicMock()
|
|
engine.iter_records_for_source.return_value = iter([])
|
|
engine.get_record.return_value = None
|
|
client._engine = engine
|
|
|
|
assert run(client.get_all_downloads()) == []
|
|
assert run(client.get_download_status("dl-001")) is None
|
|
assert run(client.cancel_download("dl-001")) is False
|
|
assert run(client.clear_all_completed_downloads()) is True
|