You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
SoulSync/core/download_plugins/usenet.py

462 lines
18 KiB

"""UsenetDownloadPlugin — composes Prowlarr search + usenet client
adapter + archive_pipeline into a uniform download source.
Mirrors ``TorrentDownloadPlugin`` in shape and lifecycle (see that
module's docstring for the full pipeline rationale). Differences:
- Search filters Prowlarr results to ``protocol='usenet'``.
- ``add_nzb`` replaces ``add_torrent``; for NZBs we usually have
a direct HTTP URL the indexer exposes via Prowlarr.
- Usenet clients (SABnzbd, NZBGet) typically auto-extract during
post-processing, so ``archive_pipeline.collect_audio_after_extraction``
usually has nothing to extract and just walks loose files.
"""
from __future__ import annotations
import threading
import time
import uuid
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from core.archive_pipeline import collect_audio_after_extraction
from core.download_plugins.album_bundle import (
copy_audio_files_atomically,
pick_best_album_release,
)
from core.download_plugins.base import DownloadSourcePlugin
from core.download_plugins.torrent import (
_adapter_state_to_display,
_decode_filename,
_guess_quality_from_title,
_parse_indexer_id_filter,
_parse_release_title,
_row_to_status,
_COMPLETE_STATES,
_FILENAME_SEP,
_POLL_INTERVAL_SECONDS,
_POLL_TIMEOUT_SECONDS,
)
from core.download_plugins.types import AlbumResult, DownloadStatus, TrackResult
from core.prowlarr_client import (
DEFAULT_MUSIC_CATEGORIES,
ProwlarrClient,
ProwlarrSearchResult,
)
from core.usenet_clients import get_active_adapter as get_active_usenet_adapter
from utils.async_helpers import run_async
from utils.logging_config import get_logger
logger = get_logger("download_plugins.usenet")
class UsenetDownloadPlugin(DownloadSourcePlugin):
"""Usenet download source backed by Prowlarr + an active usenet
client adapter (SABnzbd or NZBGet)."""
def __init__(self) -> None:
self._prowlarr = ProwlarrClient()
self.active_downloads: Dict[str, Dict[str, Any]] = {}
self._lock = threading.Lock()
self.shutdown_check = None
def set_shutdown_check(self, check_callable):
self.shutdown_check = check_callable
def reload_settings(self) -> None:
self._prowlarr.reload_settings()
def is_configured(self) -> bool:
if not self._prowlarr.is_configured():
return False
adapter = get_active_usenet_adapter()
return bool(adapter and adapter.is_configured())
async def check_connection(self) -> bool:
if not self._prowlarr.is_configured():
return False
adapter = get_active_usenet_adapter()
if not adapter or not adapter.is_configured():
return False
if not await self._prowlarr.check_connection():
return False
return await adapter.check_connection()
# ------------------------------------------------------------------
# Search
# ------------------------------------------------------------------
async def search(
self,
query: str,
timeout: Optional[int] = None,
progress_callback=None,
) -> Tuple[List[TrackResult], List[AlbumResult]]:
if not self._prowlarr.is_configured():
return ([], [])
try:
indexer_ids = _parse_indexer_id_filter()
results = await self._prowlarr.search(
query,
categories=DEFAULT_MUSIC_CATEGORIES,
indexer_ids=indexer_ids,
)
except Exception as e:
logger.error("Usenet plugin search failed: %s", e)
return ([], [])
return self._project_results(results)
def _project_results(
self, results: List[ProwlarrSearchResult]
) -> Tuple[List[TrackResult], List[AlbumResult]]:
tracks: List[TrackResult] = []
albums: List[AlbumResult] = []
for result in results:
if result.protocol != 'usenet':
continue
if not result.download_url:
continue
filename = f"{result.download_url}{_FILENAME_SEP}{result.title}"
quality = _guess_quality_from_title(result.title)
parsed_artist, parsed_title = _parse_release_title(result.title)
tr = TrackResult(
username='usenet',
filename=filename,
size=result.size,
bitrate=None,
duration=None,
quality=quality,
# Usenet doesn't expose per-uploader concurrency the way
# Soulseek does; fill in neutral non-punishing values.
free_upload_slots=1,
upload_speed=0,
queue_length=0,
# Pre-fill artist + title so TrackResult.__post_init__
# doesn't auto-parse the filename — same URL-in-filename
# gotcha as the torrent plugin.
artist=parsed_artist or result.indexer_name or 'Usenet',
title=parsed_title or result.title,
album=parsed_title or None,
track_number=None,
_source_metadata={
'indexer': result.indexer_name,
'indexer_id': result.indexer_id,
'grabs': result.grabs,
'protocol': 'usenet',
},
)
tracks.append(tr)
albums.append(AlbumResult(
username='usenet',
album_path=f"usenet/{result.guid}",
album_title=parsed_title or result.title,
artist=parsed_artist or None,
track_count=1,
total_size=result.size,
tracks=[tr],
dominant_quality=quality,
year=None,
))
return tracks, albums
# ------------------------------------------------------------------
# Download
# ------------------------------------------------------------------
async def download(
self,
username: str,
filename: str,
file_size: int = 0,
) -> Optional[str]:
if not self.is_configured():
return None
nzb_url, display_name = _decode_filename(filename)
if not nzb_url:
logger.error("Usenet download missing URL in filename: %r", filename)
return None
download_id = str(uuid.uuid4())
with self._lock:
self.active_downloads[download_id] = {
'id': download_id,
'filename': filename,
'username': 'usenet',
'display_name': display_name,
'state': 'Initializing',
'progress': 0.0,
'size': file_size,
'transferred': 0,
'speed': 0,
'file_path': None,
'audio_files': [],
'job_id': None,
'error': None,
}
thread = threading.Thread(
target=self._download_thread,
args=(download_id, nzb_url),
daemon=True,
name=f'usenet-dl-{download_id[:8]}',
)
thread.start()
return download_id
def _download_thread(self, download_id: str, nzb_url: str) -> None:
adapter = get_active_usenet_adapter()
if adapter is None or not adapter.is_configured():
self._mark_error(download_id, "No usenet client configured")
return
try:
job_id = run_async(adapter.add_nzb(nzb_url))
except Exception as e:
self._mark_error(download_id, f"add_nzb failed: {e}")
return
if not job_id:
self._mark_error(download_id, "Usenet client refused the NZB")
return
with self._lock:
row = self.active_downloads.get(download_id)
if row is not None:
row['job_id'] = job_id
row['state'] = 'InProgress, Downloading'
deadline = time.monotonic() + _POLL_TIMEOUT_SECONDS
last_save_path: Optional[str] = None
while time.monotonic() < deadline:
if self.shutdown_check and self.shutdown_check():
return
try:
status = run_async(adapter.get_status(job_id))
except Exception as e:
logger.warning("Usenet poll error for %s: %s", job_id, e)
status = None
if status is None:
self._mark_error(download_id, "Usenet job disappeared from client")
return
with self._lock:
row = self.active_downloads.get(download_id)
if row is not None:
row['progress'] = status.progress * 100.0
row['transferred'] = status.downloaded
row['speed'] = status.download_speed
row['size'] = status.size or row.get('size', 0)
row['state'] = _adapter_state_to_display(status.state)
row['error'] = status.error
if status.save_path:
last_save_path = status.save_path
if status.state in _COMPLETE_STATES:
self._finalize_download(download_id, last_save_path)
return
if status.state == 'failed':
self._mark_error(download_id, status.error or "Usenet client reported failure")
return
time.sleep(_POLL_INTERVAL_SECONDS)
self._mark_error(download_id, "Usenet download timed out")
def _finalize_download(self, download_id: str, save_path: Optional[str]) -> None:
if not save_path:
self._mark_error(download_id, "Usenet job completed but no save_path reported")
return
try:
audio_files = collect_audio_after_extraction(Path(save_path))
except Exception as e:
self._mark_error(download_id, f"Post-extract walk failed: {e}")
return
if not audio_files:
self._mark_error(download_id, f"No audio files found in {save_path}")
return
primary = audio_files[0]
with self._lock:
row = self.active_downloads.get(download_id)
if row is not None:
row['state'] = 'Completed, Succeeded'
row['progress'] = 100.0
row['file_path'] = str(primary)
row['audio_files'] = [str(path) for path in audio_files]
logger.info("Usenet download complete: %s -> %s (%d audio files)",
download_id[:8], primary.name, len(audio_files))
def _mark_error(self, download_id: str, message: str) -> None:
logger.error("Usenet download %s failed: %s", download_id[:8], message)
with self._lock:
row = self.active_downloads.get(download_id)
if row is not None:
row['state'] = 'Completed, Errored'
row['error'] = message
# ------------------------------------------------------------------
# Status / lifecycle
# ------------------------------------------------------------------
async def get_all_downloads(self) -> List[DownloadStatus]:
with self._lock:
rows = list(self.active_downloads.values())
return [_row_to_status(r) for r in rows]
async def get_download_status(self, download_id: str) -> Optional[DownloadStatus]:
with self._lock:
row = self.active_downloads.get(download_id)
if row is None:
return None
return _row_to_status(row)
async def cancel_download(
self,
download_id: str,
username: Optional[str] = None,
remove: bool = False,
) -> bool:
adapter = get_active_usenet_adapter()
with self._lock:
row = self.active_downloads.get(download_id)
job_id = row.get('job_id') if row else None
if adapter and job_id:
try:
await adapter.remove(job_id, delete_files=remove)
except Exception as e:
logger.warning("Usenet cancel via adapter failed: %s", e)
with self._lock:
if remove:
self.active_downloads.pop(download_id, None)
else:
row = self.active_downloads.get(download_id)
if row is not None:
row['state'] = 'Cancelled'
return True
async def clear_all_completed_downloads(self) -> bool:
with self._lock:
for did in list(self.active_downloads.keys()):
state = self.active_downloads[did].get('state', '')
if state.startswith('Completed') or state == 'Cancelled':
self.active_downloads.pop(did, None)
return True
# ------------------------------------------------------------------
# Album-bundle flow
# ------------------------------------------------------------------
def download_album_to_staging(
self,
album_name: str,
artist_name: str,
staging_dir: str,
progress_callback=None,
) -> Dict[str, Any]:
"""Usenet sibling of ``TorrentDownloadPlugin.download_album_to_staging``.
See that method's docstring for the contract."""
result: Dict[str, Any] = {'success': False, 'files': [], 'error': None}
if not self.is_configured():
result['error'] = 'Usenet source not configured'
return result
adapter = get_active_usenet_adapter()
if adapter is None or not adapter.is_configured():
result['error'] = 'No active usenet client'
return result
def _emit(state: str, **extra) -> None:
if progress_callback:
try:
progress_callback({'state': state, **extra})
except Exception as cb_exc:
logger.debug("[Usenet album] progress callback failed: %s", cb_exc)
query = f"{artist_name} {album_name}".strip()
_emit('searching', query=query)
try:
search_results = run_async(self._prowlarr.search(
query, categories=DEFAULT_MUSIC_CATEGORIES,
indexer_ids=_parse_indexer_id_filter(),
))
except Exception as e:
result['error'] = f'Prowlarr search failed: {e}'
return result
candidates = [r for r in search_results
if r.protocol == 'usenet' and r.download_url]
if not candidates:
result['error'] = f'No usenet results found for "{query}"'
return result
picked = pick_best_album_release(candidates, _guess_quality_from_title)
if picked is None:
result['error'] = 'No suitable NZB candidate after filtering'
return result
logger.info("[Usenet album] Picked '%s' (size=%.1fMB grabs=%s indexer=%s)",
picked.title, picked.size / 1_048_576, picked.grabs, picked.indexer_name)
_emit('queued', release=picked.title, size=picked.size, grabs=picked.grabs)
try:
job_id = run_async(adapter.add_nzb(picked.download_url))
except Exception as e:
result['error'] = f'Usenet client refused the NZB: {e}'
return result
if not job_id:
result['error'] = 'Usenet client refused the NZB'
return result
_emit('downloading', release=picked.title)
save_path = self._poll_album_download(adapter, job_id, picked.title, _emit)
if save_path is None:
result['error'] = 'Usenet download failed or timed out'
return result
_emit('staging', release=picked.title)
try:
audio_files = collect_audio_after_extraction(Path(save_path))
except Exception as e:
result['error'] = f'Failed to walk audio files: {e}'
return result
if not audio_files:
result['error'] = f'No audio files found in {save_path}'
return result
copied = copy_audio_files_atomically(audio_files, Path(staging_dir))
if not copied:
result['error'] = 'No audio files copied to staging'
return result
logger.info("[Usenet album] Staged %d audio files for '%s'", len(copied), album_name)
_emit('staged', count=len(copied))
result['success'] = True
result['files'] = copied
return result
def _poll_album_download(self, adapter, job_id, title, emit) -> Optional[str]:
deadline = time.monotonic() + _POLL_TIMEOUT_SECONDS
last_save_path: Optional[str] = None
while time.monotonic() < deadline:
if self.shutdown_check and self.shutdown_check():
return None
try:
status = run_async(adapter.get_status(job_id))
except Exception as e:
logger.warning("[Usenet album] Poll error: %s", e)
status = None
if status is None:
logger.error("[Usenet album] '%s' disappeared from client", title)
return None
emit('downloading', progress=status.progress, downloaded=status.downloaded,
speed=status.download_speed)
if status.save_path:
last_save_path = status.save_path
if status.state in _COMPLETE_STATES:
return last_save_path
if status.state == 'failed':
logger.error("[Usenet album] '%s' failed: %s", title, status.error)
return None
time.sleep(_POLL_INTERVAL_SECONDS)
logger.error("[Usenet album] '%s' timed out", title)
return None