mirror of https://github.com/Nezreka/SoulSync.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
453 lines
17 KiB
453 lines
17 KiB
"""Amazon Music download source plugin backed by a T2Tunes proxy.
|
|
|
|
NOT yet wired into the app registry — validated in isolation only.
|
|
See tests/tools/test_amazon_download_client.py.
|
|
|
|
Filename encoding (the DownloadSourcePlugin dispatch contract):
|
|
"{asin}||{display_name}"
|
|
e.g. "B09XYZ1234||Kendrick Lamar - Not Like Us"
|
|
|
|
Codec preference order: FLAC → Opus → EAC3.
|
|
|
|
Download flow (from Tubifarry reference implementation):
|
|
1. GET stream_url → encrypted bytes on disk
|
|
2. FFmpeg -decryption_key <hex> to decrypt in place
|
|
3. Embed metadata tags (handled by the app's standard post-processing)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
import uuid
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
import requests as http_requests
|
|
|
|
from config.settings import config_manager
|
|
from core.amazon_client import AmazonClient, AmazonClientError
|
|
from core.download_plugins.base import DownloadSourcePlugin
|
|
from core.download_plugins.types import AlbumResult, DownloadStatus, TrackResult
|
|
from utils.logging_config import get_logger
|
|
|
|
logger = get_logger("amazon_download_client")
|
|
|
|
# Quality / codec helpers
|
|
CODEC_PREFERENCE = ["flac", "opus", "eac3"]
|
|
|
|
_CODEC_EXTENSIONS: Dict[str, str] = {
|
|
"flac": "flac",
|
|
"ogg_vorbis": "ogg",
|
|
"opus": "opus",
|
|
"eac3": "eac3",
|
|
"mp4": "m4a",
|
|
"aac": "m4a",
|
|
"mp3": "mp3",
|
|
}
|
|
|
|
MIN_AUDIO_BYTES = 512 * 1024 # 512 KB — anything smaller is a broken stream
|
|
|
|
|
|
def _codec_key(codec: str) -> str:
|
|
return codec.lower().replace("-", "_").replace(" ", "_")
|
|
|
|
|
|
def _file_extension(codec: str) -> str:
|
|
return _CODEC_EXTENSIONS.get(_codec_key(codec), "bin")
|
|
|
|
|
|
def _quality_label(codec: str, sample_rate: Optional[int] = None) -> str:
|
|
ck = _codec_key(codec)
|
|
if ck == "flac":
|
|
if sample_rate and sample_rate > 48000:
|
|
return "Hi-Res"
|
|
return "Lossless"
|
|
return "Lossy"
|
|
|
|
|
|
class AmazonDownloadClient(DownloadSourcePlugin):
|
|
"""DownloadSourcePlugin — Amazon Music via T2Tunes proxy."""
|
|
|
|
def __init__(self, download_path: Optional[str] = None) -> None:
|
|
if download_path is None:
|
|
download_path = config_manager.get("soulseek.download_path", "./downloads")
|
|
self.download_path = Path(download_path)
|
|
self.download_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
self._quality = config_manager.get("amazon_download.quality", "flac")
|
|
self._allow_fallback = config_manager.get("amazon_download.allow_fallback", True)
|
|
|
|
self._client = AmazonClient(preferred_codec=self._quality)
|
|
self.session = http_requests.Session()
|
|
self.session.headers.update({
|
|
"User-Agent": "SoulSync/1.0",
|
|
"Accept": "*/*",
|
|
})
|
|
|
|
self._engine: Any = None
|
|
self.shutdown_check: Any = None
|
|
|
|
# ------------------------------------------------------------------
|
|
# DownloadSourcePlugin — lifecycle
|
|
# ------------------------------------------------------------------
|
|
|
|
def set_engine(self, engine) -> None:
|
|
"""Engine callback — wires the central thread worker + state store."""
|
|
self._engine = engine
|
|
|
|
def set_shutdown_check(self, check_callable) -> None:
|
|
self.shutdown_check = check_callable
|
|
|
|
def is_configured(self) -> bool:
|
|
# T2Tunes has a public default instance; no credentials required.
|
|
# Return True unconditionally so the source shows as available.
|
|
return True
|
|
|
|
async def check_connection(self) -> bool:
|
|
try:
|
|
return self._client.is_authenticated()
|
|
except Exception:
|
|
return False
|
|
|
|
# ------------------------------------------------------------------
|
|
# DownloadSourcePlugin — search
|
|
# ------------------------------------------------------------------
|
|
|
|
async def search(
|
|
self,
|
|
query: str,
|
|
timeout: Optional[int] = None,
|
|
progress_callback: Any = None,
|
|
) -> Tuple[List[TrackResult], List[AlbumResult]]:
|
|
try:
|
|
items = self._client.search_raw(query, types="track,album")
|
|
except AmazonClientError as exc:
|
|
logger.warning(f"Amazon search failed for {query!r}: {exc}")
|
|
return [], []
|
|
|
|
track_results: List[TrackResult] = []
|
|
album_map: Dict[str, AlbumResult] = {}
|
|
album_order: List[str] = []
|
|
preferred = self._client.preferred_codec
|
|
|
|
for item in items:
|
|
quality = _quality_label(preferred)
|
|
if item.is_track:
|
|
track_results.append(TrackResult(
|
|
username="amazon",
|
|
filename=f"{item.asin}||{item.artist_name} - {item.title}",
|
|
size=0,
|
|
bitrate=None,
|
|
duration=item.duration_seconds * 1000 if item.duration_seconds else None,
|
|
quality=quality,
|
|
free_upload_slots=999,
|
|
upload_speed=999_999,
|
|
queue_length=0,
|
|
artist=item.artist_name,
|
|
title=item.title,
|
|
album=item.album_name,
|
|
_source_metadata={
|
|
"asin": item.asin,
|
|
"album_asin": item.album_asin,
|
|
"isrc": item.isrc,
|
|
},
|
|
))
|
|
elif item.is_album:
|
|
album_asin = item.album_asin or item.asin
|
|
if album_asin not in album_map:
|
|
placeholder = TrackResult(
|
|
username="amazon",
|
|
filename=f"{item.asin}||{item.artist_name} - {item.title}",
|
|
size=0,
|
|
bitrate=None,
|
|
duration=None,
|
|
quality=quality,
|
|
free_upload_slots=999,
|
|
upload_speed=999_999,
|
|
queue_length=0,
|
|
artist=item.artist_name,
|
|
title=item.title,
|
|
album=item.album_name,
|
|
)
|
|
album_map[album_asin] = AlbumResult(
|
|
username="amazon",
|
|
album_path=album_asin,
|
|
album_title=item.album_name or item.title,
|
|
artist=item.artist_name,
|
|
track_count=0,
|
|
total_size=0,
|
|
tracks=[placeholder],
|
|
dominant_quality=quality,
|
|
)
|
|
album_order.append(album_asin)
|
|
|
|
return track_results, [album_map[k] for k in album_order]
|
|
|
|
# ------------------------------------------------------------------
|
|
# DownloadSourcePlugin — download dispatch
|
|
# ------------------------------------------------------------------
|
|
|
|
async def download(
|
|
self,
|
|
username: str,
|
|
filename: str,
|
|
file_size: int = 0,
|
|
) -> Optional[str]:
|
|
if "||" not in filename:
|
|
logger.error(f"Invalid Amazon filename format (no '||'): {filename!r}")
|
|
return None
|
|
if self._engine is None:
|
|
raise RuntimeError(
|
|
"AmazonDownloadClient._engine is not set — "
|
|
"client not connected to download infrastructure"
|
|
)
|
|
asin, display_name = filename.split("||", 1)
|
|
asin = asin.strip()
|
|
display_name = display_name.strip()
|
|
return self._engine.worker.dispatch(
|
|
source_name="amazon",
|
|
target_id=asin,
|
|
display_name=display_name,
|
|
original_filename=filename,
|
|
impl_callable=self._download_sync,
|
|
)
|
|
|
|
def _download_sync(
|
|
self,
|
|
download_id: str,
|
|
target_id: str,
|
|
display_name: str,
|
|
) -> Optional[str]:
|
|
asin = str(target_id)
|
|
codecs = CODEC_PREFERENCE if self._allow_fallback else [self._quality]
|
|
for codec in codecs:
|
|
try:
|
|
streams = self._client.media_from_asin(asin, codec=codec)
|
|
except AmazonClientError as exc:
|
|
logger.warning(f"media_from_asin({asin!r}, {codec}) failed: {exc}")
|
|
continue
|
|
|
|
stream = next(
|
|
(s for s in streams if s.streamable and s.stream_url),
|
|
None,
|
|
)
|
|
if not stream:
|
|
logger.debug(f"No streamable result for {asin} at codec={codec}")
|
|
continue
|
|
|
|
ext = _file_extension(stream.codec or codec)
|
|
safe = "".join(
|
|
ch if ch.isalnum() or ch in " -_." else "_"
|
|
for ch in display_name
|
|
)[:80]
|
|
# T2Tunes always serves audio in an encrypted MP4 container.
|
|
# The codec extension (.flac, .opus, .eac3) is only for the
|
|
# final decrypted output.
|
|
enc_ext = "mp4" if stream.decryption_key else ext
|
|
enc_path = self._unique_path(self.download_path / f"{safe}.enc.{enc_ext}")
|
|
out_path = self._unique_path(self.download_path / f"{safe}.{ext}")
|
|
|
|
if self._engine is not None:
|
|
self._engine.update_record(
|
|
"amazon", download_id, {"state": "downloading", "progress": 0.0}
|
|
)
|
|
try:
|
|
downloaded = self._stream_to_file(stream.stream_url, enc_path, download_id)
|
|
except Exception as exc:
|
|
logger.warning(f"Stream download failed for {asin} ({codec}): {exc}")
|
|
enc_path.unlink(missing_ok=True)
|
|
continue
|
|
|
|
if downloaded < MIN_AUDIO_BYTES:
|
|
logger.warning(
|
|
f"File too small ({downloaded} B) for {asin} at {codec} — trying next"
|
|
)
|
|
enc_path.unlink(missing_ok=True)
|
|
continue
|
|
|
|
if stream.decryption_key:
|
|
if self._engine is not None:
|
|
self._engine.update_record(
|
|
"amazon", download_id, {"state": "decrypting", "progress": 1.0}
|
|
)
|
|
try:
|
|
self._decrypt_with_ffmpeg(enc_path, out_path, stream.decryption_key)
|
|
enc_path.unlink(missing_ok=True)
|
|
except Exception as exc:
|
|
logger.error(f"Decryption failed for {asin} ({codec}): {exc}")
|
|
enc_path.unlink(missing_ok=True)
|
|
out_path.unlink(missing_ok=True)
|
|
continue
|
|
else:
|
|
enc_path.rename(out_path)
|
|
|
|
final_size = out_path.stat().st_size
|
|
logger.info(
|
|
f"Amazon download complete ({codec}): {out_path} "
|
|
f"({final_size / (1024 * 1024):.1f} MB)"
|
|
)
|
|
# Sync size == transferred so the download monitor's bytes-incomplete
|
|
# guard doesn't block post-processing. The throttled updates in
|
|
# _stream_to_file leave transferred < size after the last 0.5s tick;
|
|
# other streaming clients avoid this by not tracking bytes at all
|
|
# (size stays 0, the guard is skipped). Writing the final output size
|
|
# here restores parity.
|
|
if self._engine is not None:
|
|
self._engine.update_record(
|
|
"amazon", download_id, {'size': final_size, 'transferred': final_size}
|
|
)
|
|
return str(out_path)
|
|
|
|
logger.error(f"All codec tiers exhausted for '{display_name}' ({asin})")
|
|
return None
|
|
|
|
def _decrypt_with_ffmpeg(
|
|
self, enc_path: Path, out_path: Path, hex_key: str
|
|
) -> None:
|
|
"""Decrypt a T2Tunes encrypted audio file using FFmpeg -decryption_key.
|
|
|
|
T2Tunes uses CENC (Common Encryption) for DRM-protected tracks.
|
|
FFmpeg supports decryption via the -decryption_key flag when the
|
|
key is provided in hex.
|
|
"""
|
|
ffmpeg = shutil.which("ffmpeg")
|
|
if not ffmpeg:
|
|
tools_dir = Path(__file__).parent.parent / "tools"
|
|
candidate = tools_dir / ("ffmpeg.exe" if os.name == "nt" else "ffmpeg")
|
|
if candidate.exists():
|
|
ffmpeg = str(candidate)
|
|
else:
|
|
raise RuntimeError(
|
|
"ffmpeg is required for Amazon Music decryption. "
|
|
"Install ffmpeg and ensure it is on your PATH."
|
|
)
|
|
cmd = [
|
|
ffmpeg,
|
|
"-y",
|
|
"-hide_banner",
|
|
"-loglevel", "error",
|
|
"-decryption_key", hex_key,
|
|
"-i", str(enc_path),
|
|
"-map", "0:a:0", # extract first audio stream (FLAC/Opus/EAC3 inside MP4)
|
|
"-c", "copy",
|
|
str(out_path),
|
|
]
|
|
logger.debug(f"Decrypting {enc_path.name} → {out_path.name}")
|
|
result = subprocess.run(cmd, capture_output=True)
|
|
if result.returncode != 0:
|
|
stderr = result.stderr.decode("utf-8", errors="replace").strip()
|
|
raise RuntimeError(f"FFmpeg decryption failed (exit {result.returncode}): {stderr}")
|
|
|
|
def _stream_to_file(self, url: str, out_path: Path, download_id: str) -> int:
|
|
resp = self.session.get(url, stream=True, timeout=60)
|
|
resp.raise_for_status()
|
|
|
|
total = int(resp.headers.get("content-length", 0))
|
|
downloaded = 0
|
|
last_report = time.monotonic()
|
|
shutdown_triggered = False
|
|
|
|
with out_path.open("wb") as fh:
|
|
for chunk in resp.iter_content(chunk_size=64 * 1024):
|
|
if not chunk:
|
|
continue
|
|
if self.shutdown_check and self.shutdown_check():
|
|
shutdown_triggered = True
|
|
break
|
|
fh.write(chunk)
|
|
downloaded += len(chunk)
|
|
now = time.monotonic()
|
|
if self._engine and now - last_report >= 0.5:
|
|
self._engine.update_record(
|
|
"amazon",
|
|
download_id,
|
|
{
|
|
"transferred": downloaded,
|
|
"size": total,
|
|
"progress": downloaded / total if total else 0.0,
|
|
},
|
|
)
|
|
last_report = now
|
|
|
|
if shutdown_triggered:
|
|
out_path.unlink(missing_ok=True)
|
|
raise RuntimeError("Shutdown requested mid-download")
|
|
|
|
return downloaded
|
|
|
|
# ------------------------------------------------------------------
|
|
# DownloadSourcePlugin — status interface
|
|
# ------------------------------------------------------------------
|
|
|
|
async def get_all_downloads(self) -> List[DownloadStatus]:
|
|
if self._engine is None:
|
|
return []
|
|
return [
|
|
self._record_to_status(record)
|
|
for record in self._engine.iter_records_for_source('amazon')
|
|
]
|
|
|
|
async def get_download_status(self, download_id: str) -> Optional[DownloadStatus]:
|
|
if self._engine is None:
|
|
return None
|
|
record = self._engine.get_record('amazon', download_id)
|
|
return self._record_to_status(record) if record is not None else None
|
|
|
|
async def cancel_download(
|
|
self,
|
|
download_id: str,
|
|
username: Optional[str] = None,
|
|
remove: bool = False,
|
|
) -> bool:
|
|
if self._engine is None:
|
|
return False
|
|
if self._engine.get_record('amazon', download_id) is None:
|
|
return False
|
|
self._engine.update_record('amazon', download_id, {'state': 'Cancelled'})
|
|
if remove:
|
|
self._engine.remove_record('amazon', download_id)
|
|
return True
|
|
|
|
async def clear_all_completed_downloads(self) -> bool:
|
|
if self._engine is None:
|
|
return True
|
|
terminal = {'Completed, Succeeded', 'Cancelled', 'Errored', 'Aborted'}
|
|
for record in list(self._engine.iter_records_for_source('amazon')):
|
|
if record.get('state') in terminal:
|
|
self._engine.remove_record('amazon', record['id'])
|
|
return True
|
|
|
|
# ------------------------------------------------------------------
|
|
# Private helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def _unique_path(path: Path) -> Path:
|
|
if not path.exists():
|
|
return path
|
|
stem, suffix = path.stem, path.suffix
|
|
for i in range(1, 100):
|
|
candidate = path.with_name(f"{stem} ({i}){suffix}")
|
|
if not candidate.exists():
|
|
return candidate
|
|
return path.with_name(f"{stem}_{uuid.uuid4().hex[:8]}{suffix}")
|
|
|
|
@staticmethod
|
|
def _record_to_status(rec: Dict[str, Any]) -> DownloadStatus:
|
|
return DownloadStatus(
|
|
id=str(rec.get('id', '')),
|
|
filename=str(rec.get('filename', '')),
|
|
username='amazon',
|
|
state=str(rec.get('state', 'queued')),
|
|
progress=float(rec.get('progress', 0.0)),
|
|
size=int(rec.get('size', 0)),
|
|
transferred=int(rec.get('transferred', 0)),
|
|
speed=int(rec.get('speed', 0)),
|
|
time_remaining=rec.get('time_remaining'),
|
|
file_path=rec.get('file_path'),
|
|
)
|