mirror of https://github.com/Nezreka/SoulSync.git
Third commit in the torrent + usenet rollout. SoulSync now also
speaks the two big usenet downloaders through a sibling adapter
contract that mirrors the torrent adapter set. All three layers are
now stood up — Prowlarr finds releases, the torrent adapter and the
usenet adapter each know how to ship work to the underlying client.
A later commit wires Prowlarr search results through the adapters
and through the archive-extract-match pipeline.
- core/usenet_clients/base.py: UsenetClientAdapter Protocol +
UsenetStatus dataclass. Uniform state set covers usenet-specific
phases (queued / downloading / extracting / verifying / repairing /
completed / failed / paused).
- core/usenet_clients/__init__.py: adapter_for_type factory +
get_active_adapter that reads usenet_client.type each call.
- core/usenet_clients/sabnzbd.py: REST adapter. ?apikey=... auth,
mode=addurl and mode=addfile (multipart) for add_nzb. Reads both
the active queue and the recent history so completed / failed
jobs surface in get_all. Parses SAB's HH:MM:SS ``timeleft`` into
seconds.
- core/usenet_clients/nzbget.py: JSON-RPC adapter. HTTP Basic auth,
``append`` method for add_nzb (auto-detects URL vs base64 NZB),
``editqueue`` with GroupPause/GroupResume/GroupDelete/GroupFinalDelete
for state changes. Reads NZBGet's 64-bit split size fields
(FileSizeHi + FileSizeLo) preferentially over the legacy
FileSizeMB aggregate.
- core/connection_test.py: 'usenet_client' branch picks the right
adapter, runs check_connection, surfaces per-client error
messages (different credentials needed).
- config/settings.py: usenet_client.{type, url, api_key, username,
password, category} defaults + both api_key and password marked
encrypted-at-rest.
- web_server.py: 'usenet_client' added to the /api/settings POST
allow-list.
- webui/index.html: new Usenet Client panel on the Indexers &
Downloaders tab. Type picker swaps the credential fields between
API-key (SABnzbd) and username+password (NZBGet).
- webui/static/settings.js: load/save wiring, updateUsenetClientUI
for the credential field swap, testUsenetClientConnection.
- webui/static/helper.js: WHATS_NEW + VERSION_MODAL_SECTIONS entry.
pull/665/head
parent
de2faf290b
commit
7a3ce50f71
@ -0,0 +1,49 @@
|
||||
"""Usenet client adapters.
|
||||
|
||||
Each adapter wraps one Usenet downloader (SABnzbd, NZBGet) behind
|
||||
the ``UsenetClientAdapter`` Protocol so the rest of SoulSync can
|
||||
talk to whichever client the user picked through one uniform
|
||||
surface.
|
||||
|
||||
The active adapter is selected at runtime by the
|
||||
``usenet_client.type`` config key. See ``get_active_adapter()``
|
||||
for the factory.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Optional
|
||||
|
||||
from config.settings import config_manager
|
||||
|
||||
from core.usenet_clients.base import UsenetClientAdapter, UsenetStatus
|
||||
from core.usenet_clients.nzbget import NZBGetAdapter
|
||||
from core.usenet_clients.sabnzbd import SABnzbdAdapter
|
||||
|
||||
__all__ = [
|
||||
"UsenetClientAdapter",
|
||||
"UsenetStatus",
|
||||
"SABnzbdAdapter",
|
||||
"NZBGetAdapter",
|
||||
"get_active_adapter",
|
||||
"adapter_for_type",
|
||||
]
|
||||
|
||||
|
||||
def adapter_for_type(client_type: str) -> Optional[UsenetClientAdapter]:
|
||||
"""Build a fresh adapter instance for the given client type string.
|
||||
``None`` for unknown types."""
|
||||
if client_type == "sabnzbd":
|
||||
return SABnzbdAdapter()
|
||||
if client_type == "nzbget":
|
||||
return NZBGetAdapter()
|
||||
return None
|
||||
|
||||
|
||||
def get_active_adapter() -> Optional[UsenetClientAdapter]:
|
||||
"""Return an adapter for whichever usenet client the user has
|
||||
selected in Settings. Reads ``usenet_client.type`` each call."""
|
||||
client_type = (config_manager.get('usenet_client.type', '') or '').strip().lower()
|
||||
if not client_type:
|
||||
return None
|
||||
return adapter_for_type(client_type)
|
||||
@ -0,0 +1,277 @@
|
||||
"""NZBGet adapter.
|
||||
|
||||
Auth model: HTTP Basic auth on the JSON-RPC endpoint ``/jsonrpc``.
|
||||
Every method takes positional ``params``. Identical pattern to
|
||||
Deluge but with different method names.
|
||||
|
||||
Reference: https://nzbget.com/documentation/api/
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import base64
|
||||
from itertools import count
|
||||
from typing import Any, List, Optional, Union
|
||||
|
||||
import requests as http_requests
|
||||
|
||||
from config.settings import config_manager
|
||||
from core.usenet_clients.base import UsenetStatus
|
||||
from utils.logging_config import get_logger
|
||||
|
||||
logger = get_logger("usenet.nzbget")
|
||||
|
||||
|
||||
# NZBGet's ``Status`` field on ListGroups → adapter-uniform set.
|
||||
# NZBGet states (group): QUEUED, PAUSED, DOWNLOADING, FETCHING, PP_QUEUED,
|
||||
# LOADING_PARS, VERIFYING_SOURCES, REPAIRING, VERIFYING_REPAIRED, RENAMING,
|
||||
# UNPACKING, MOVING, EXECUTING_SCRIPT, PP_FINISHED.
|
||||
_NZBGET_STATE_MAP = {
|
||||
"QUEUED": "queued",
|
||||
"PAUSED": "paused",
|
||||
"DOWNLOADING": "downloading",
|
||||
"FETCHING": "downloading",
|
||||
"PP_QUEUED": "queued",
|
||||
"LOADING_PARS": "verifying",
|
||||
"VERIFYING_SOURCES": "verifying",
|
||||
"REPAIRING": "repairing",
|
||||
"VERIFYING_REPAIRED": "verifying",
|
||||
"RENAMING": "extracting",
|
||||
"UNPACKING": "extracting",
|
||||
"MOVING": "extracting",
|
||||
"EXECUTING_SCRIPT": "extracting",
|
||||
"PP_FINISHED": "completed",
|
||||
}
|
||||
|
||||
|
||||
def _map_state(nzbget_state: str) -> str:
|
||||
return _NZBGET_STATE_MAP.get(nzbget_state or '', "error")
|
||||
|
||||
|
||||
class NZBGetAdapter:
|
||||
"""NZBGet JSON-RPC adapter."""
|
||||
|
||||
DEFAULT_TIMEOUT = 15
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._id_counter = count(1)
|
||||
self._load_config()
|
||||
|
||||
def _load_config(self) -> None:
|
||||
self._url = (config_manager.get('usenet_client.url', '') or '').rstrip('/')
|
||||
self._username = config_manager.get('usenet_client.username', '') or ''
|
||||
self._password = config_manager.get('usenet_client.password', '') or ''
|
||||
self._category = config_manager.get('usenet_client.category', 'soulsync') or 'soulsync'
|
||||
|
||||
def reload_settings(self) -> None:
|
||||
self._load_config()
|
||||
|
||||
def is_configured(self) -> bool:
|
||||
return bool(self._url and self._username and self._password)
|
||||
|
||||
async def check_connection(self) -> bool:
|
||||
if not self.is_configured():
|
||||
return False
|
||||
loop = asyncio.get_event_loop()
|
||||
return await loop.run_in_executor(None, self._check_connection_sync)
|
||||
|
||||
def _check_connection_sync(self) -> bool:
|
||||
return self._rpc_sync('version', []) is not None
|
||||
|
||||
def _rpc_sync(self, method: str, params: list) -> Any:
|
||||
if not self._url:
|
||||
return None
|
||||
try:
|
||||
resp = http_requests.post(
|
||||
f"{self._url}/jsonrpc",
|
||||
json={'method': method, 'params': params, 'id': next(self._id_counter)},
|
||||
auth=(self._username, self._password) if self._username else None,
|
||||
headers={'Content-Type': 'application/json'},
|
||||
timeout=self.DEFAULT_TIMEOUT,
|
||||
)
|
||||
if not resp.ok:
|
||||
logger.warning("NZBGet %s returned HTTP %s", method, resp.status_code)
|
||||
return None
|
||||
data = resp.json()
|
||||
if data.get('error'):
|
||||
logger.warning("NZBGet %s error: %r", method, data.get('error'))
|
||||
return None
|
||||
return data.get('result')
|
||||
except http_requests.exceptions.RequestException as e:
|
||||
logger.error("NZBGet %s call failed: %s", method, e)
|
||||
return None
|
||||
except ValueError as e:
|
||||
logger.error("NZBGet %s response not JSON: %s", method, e)
|
||||
return None
|
||||
|
||||
async def add_nzb(
|
||||
self,
|
||||
url_or_bytes: Union[str, bytes],
|
||||
category: str = "soulsync",
|
||||
save_path: Optional[str] = None,
|
||||
) -> Optional[str]:
|
||||
loop = asyncio.get_event_loop()
|
||||
return await loop.run_in_executor(
|
||||
None, self._add_nzb_sync, url_or_bytes, category, save_path
|
||||
)
|
||||
|
||||
def _add_nzb_sync(
|
||||
self,
|
||||
url_or_bytes: Union[str, bytes],
|
||||
category: str,
|
||||
save_path: Optional[str],
|
||||
) -> Optional[str]:
|
||||
cat = category or self._category
|
||||
# NZBGet's ``append`` takes: NZBFilename, Content, Category,
|
||||
# Priority, AddToTop, AddPaused, DupeKey, DupeScore, DupeMode,
|
||||
# PPParameters. We pass the minimum required for an unpause-on-add.
|
||||
# Content is either base64 of the raw .nzb or a URL — NZBGet
|
||||
# auto-detects which based on whether it looks like a URL.
|
||||
if isinstance(url_or_bytes, bytes):
|
||||
content = base64.b64encode(url_or_bytes).decode('ascii')
|
||||
nzb_filename = 'soulsync.nzb'
|
||||
else:
|
||||
content = url_or_bytes
|
||||
nzb_filename = ''
|
||||
params = [
|
||||
nzb_filename, # NZBFilename
|
||||
content, # Content (URL or base64 NZB)
|
||||
cat, # Category
|
||||
0, # Priority
|
||||
False, # AddToTop
|
||||
False, # AddPaused
|
||||
'', # DupeKey
|
||||
0, # DupeScore
|
||||
'SCORE', # DupeMode
|
||||
[], # PPParameters
|
||||
]
|
||||
result = self._rpc_sync('append', params)
|
||||
if isinstance(result, int) and result > 0:
|
||||
return str(result)
|
||||
return None
|
||||
|
||||
async def get_status(self, job_id: str) -> Optional[UsenetStatus]:
|
||||
loop = asyncio.get_event_loop()
|
||||
return await loop.run_in_executor(None, self._get_status_sync, job_id)
|
||||
|
||||
def _get_status_sync(self, job_id: str) -> Optional[UsenetStatus]:
|
||||
for status in self._get_all_sync():
|
||||
if status.id == job_id:
|
||||
return status
|
||||
return None
|
||||
|
||||
async def get_all(self) -> List[UsenetStatus]:
|
||||
loop = asyncio.get_event_loop()
|
||||
return await loop.run_in_executor(None, self._get_all_sync)
|
||||
|
||||
def _get_all_sync(self) -> List[UsenetStatus]:
|
||||
out: List[UsenetStatus] = []
|
||||
groups = self._rpc_sync('listgroups', [0])
|
||||
if isinstance(groups, list):
|
||||
for group in groups:
|
||||
out.append(self._parse_group(group))
|
||||
history = self._rpc_sync('history', [False])
|
||||
if isinstance(history, list):
|
||||
for entry in history:
|
||||
out.append(self._parse_history(entry))
|
||||
return out
|
||||
|
||||
def _parse_group(self, group: dict) -> UsenetStatus:
|
||||
# NZBGet reports sizes split into ``FileSizeLo`` (low 32 bits) +
|
||||
# ``FileSizeHi`` (high 32 bits) for compat with old clients —
|
||||
# ``FileSizeMB`` is the human-friendly aggregate.
|
||||
size_mb = self._mb_value(group, 'FileSize')
|
||||
remaining_mb = self._mb_value(group, 'RemainingSize')
|
||||
size_bytes = int(size_mb * 1024 * 1024) if size_mb else 0
|
||||
downloaded_bytes = int((size_mb - remaining_mb) * 1024 * 1024) if size_mb and remaining_mb is not None else 0
|
||||
progress = 0.0
|
||||
if size_bytes > 0:
|
||||
progress = max(0.0, min(downloaded_bytes / size_bytes, 1.0))
|
||||
# NZBGet's per-group ``DownloadRate`` field is in bytes/sec.
|
||||
speed = int(group.get('DownloadRate') or 0)
|
||||
return UsenetStatus(
|
||||
id=str(group.get('NZBID') or ''),
|
||||
name=group.get('NZBName') or '',
|
||||
state=_map_state(group.get('Status') or ''),
|
||||
progress=progress,
|
||||
size=size_bytes,
|
||||
downloaded=downloaded_bytes,
|
||||
download_speed=speed,
|
||||
save_path=group.get('DestDir'),
|
||||
category=group.get('Category'),
|
||||
)
|
||||
|
||||
def _parse_history(self, entry: dict) -> UsenetStatus:
|
||||
# History entries have ``Status`` like ``SUCCESS/HEALTH``,
|
||||
# ``SUCCESS/UNPACK``, ``FAILURE/PAR``, etc.
|
||||
status_field = entry.get('Status') or ''
|
||||
is_failed = status_field.startswith('FAILURE')
|
||||
size_mb = self._mb_value(entry, 'FileSize')
|
||||
size_bytes = int(size_mb * 1024 * 1024) if size_mb else 0
|
||||
return UsenetStatus(
|
||||
id=str(entry.get('NZBID') or ''),
|
||||
name=entry.get('Name') or entry.get('NZBName') or '',
|
||||
state='failed' if is_failed else 'completed',
|
||||
progress=0.0 if is_failed else 1.0,
|
||||
size=size_bytes,
|
||||
downloaded=size_bytes if not is_failed else 0,
|
||||
download_speed=0,
|
||||
save_path=entry.get('DestDir'),
|
||||
category=entry.get('Category'),
|
||||
error=status_field if is_failed else None,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _mb_value(entry: dict, prefix: str) -> Optional[float]:
|
||||
"""Read an NZBGet size field. Prefers the high+low 32-bit split
|
||||
when available (most accurate); falls back to the ``MB``
|
||||
aggregate for older NZBGet versions."""
|
||||
lo = entry.get(f'{prefix}Lo')
|
||||
hi = entry.get(f'{prefix}Hi')
|
||||
if isinstance(lo, int) and isinstance(hi, int):
|
||||
total_bytes = (hi << 32) | lo
|
||||
return total_bytes / (1024 * 1024)
|
||||
mb = entry.get(f'{prefix}MB')
|
||||
if isinstance(mb, (int, float)):
|
||||
return float(mb)
|
||||
return None
|
||||
|
||||
async def remove(self, job_id: str, delete_files: bool = False) -> bool:
|
||||
loop = asyncio.get_event_loop()
|
||||
return await loop.run_in_executor(None, self._remove_sync, job_id, delete_files)
|
||||
|
||||
def _remove_sync(self, job_id: str, delete_files: bool) -> bool:
|
||||
# editqueue commands take a list of NZBIDs. ``GroupFinalDelete``
|
||||
# both removes and deletes downloaded data; ``GroupDelete`` just
|
||||
# removes the queue entry.
|
||||
try:
|
||||
id_int = int(job_id)
|
||||
except (TypeError, ValueError):
|
||||
return False
|
||||
command = 'GroupFinalDelete' if delete_files else 'GroupDelete'
|
||||
# editqueue(Command, Offset, EditText, IDs)
|
||||
result = self._rpc_sync('editqueue', [command, 0, '', [id_int]])
|
||||
return bool(result)
|
||||
|
||||
async def pause(self, job_id: str) -> bool:
|
||||
loop = asyncio.get_event_loop()
|
||||
return await loop.run_in_executor(None, self._pause_sync, job_id)
|
||||
|
||||
def _pause_sync(self, job_id: str) -> bool:
|
||||
try:
|
||||
id_int = int(job_id)
|
||||
except (TypeError, ValueError):
|
||||
return False
|
||||
return bool(self._rpc_sync('editqueue', ['GroupPause', 0, '', [id_int]]))
|
||||
|
||||
async def resume(self, job_id: str) -> bool:
|
||||
loop = asyncio.get_event_loop()
|
||||
return await loop.run_in_executor(None, self._resume_sync, job_id)
|
||||
|
||||
def _resume_sync(self, job_id: str) -> bool:
|
||||
try:
|
||||
id_int = int(job_id)
|
||||
except (TypeError, ValueError):
|
||||
return False
|
||||
return bool(self._rpc_sync('editqueue', ['GroupResume', 0, '', [id_int]]))
|
||||
@ -0,0 +1,284 @@
|
||||
"""SABnzbd adapter.
|
||||
|
||||
Auth model: a single API key passed as ``?apikey=...`` on every
|
||||
request. No login flow. Every endpoint is the same path ``/api`` with
|
||||
a ``mode=`` query param.
|
||||
|
||||
Reference: https://sabnzbd.org/wiki/configuration/4.3/api
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from typing import List, Optional, Union
|
||||
|
||||
import requests as http_requests
|
||||
|
||||
from config.settings import config_manager
|
||||
from core.usenet_clients.base import UsenetStatus
|
||||
from utils.logging_config import get_logger
|
||||
|
||||
logger = get_logger("usenet.sabnzbd")
|
||||
|
||||
|
||||
# SAB queue states + history states → adapter-uniform set.
|
||||
# Queue: Idle, Paused, Downloading, Grabbing, Queued, Checking,
|
||||
# QuickCheck, Verifying, Repairing, Fetching, Extracting, Moving,
|
||||
# Running, Completed, Failed.
|
||||
_SAB_QUEUE_STATE_MAP = {
|
||||
"idle": "queued",
|
||||
"queued": "queued",
|
||||
"grabbing": "queued",
|
||||
"fetching": "downloading",
|
||||
"downloading": "downloading",
|
||||
"paused": "paused",
|
||||
"checking": "verifying",
|
||||
"quickcheck": "verifying",
|
||||
"verifying": "verifying",
|
||||
"repairing": "repairing",
|
||||
"extracting": "extracting",
|
||||
"moving": "extracting",
|
||||
"running": "extracting",
|
||||
"completed": "completed",
|
||||
"failed": "failed",
|
||||
}
|
||||
|
||||
|
||||
def _map_state(sab_state: str) -> str:
|
||||
return _SAB_QUEUE_STATE_MAP.get((sab_state or "").lower(), "error")
|
||||
|
||||
|
||||
class SABnzbdAdapter:
|
||||
"""SABnzbd REST API adapter (v2+)."""
|
||||
|
||||
DEFAULT_TIMEOUT = 15
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._load_config()
|
||||
|
||||
def _load_config(self) -> None:
|
||||
self._url = (config_manager.get('usenet_client.url', '') or '').rstrip('/')
|
||||
self._api_key = config_manager.get('usenet_client.api_key', '') or ''
|
||||
self._category = config_manager.get('usenet_client.category', 'soulsync') or 'soulsync'
|
||||
|
||||
def reload_settings(self) -> None:
|
||||
self._load_config()
|
||||
|
||||
def is_configured(self) -> bool:
|
||||
return bool(self._url and self._api_key)
|
||||
|
||||
async def check_connection(self) -> bool:
|
||||
if not self.is_configured():
|
||||
return False
|
||||
loop = asyncio.get_event_loop()
|
||||
return await loop.run_in_executor(None, self._check_connection_sync)
|
||||
|
||||
def _check_connection_sync(self) -> bool:
|
||||
# ``mode=version`` is the cheapest authenticated probe SAB exposes.
|
||||
data = self._call_sync('version')
|
||||
return bool(data and data.get('version'))
|
||||
|
||||
def _call_sync(self, mode: str, **extra) -> Optional[dict]:
|
||||
if not self.is_configured():
|
||||
return None
|
||||
params = {
|
||||
'mode': mode,
|
||||
'output': 'json',
|
||||
'apikey': self._api_key,
|
||||
}
|
||||
params.update(extra)
|
||||
try:
|
||||
resp = http_requests.get(f"{self._url}/api", params=params, timeout=self.DEFAULT_TIMEOUT)
|
||||
if not resp.ok:
|
||||
logger.warning("SABnzbd mode=%s returned HTTP %s", mode, resp.status_code)
|
||||
return None
|
||||
return resp.json()
|
||||
except http_requests.exceptions.RequestException as e:
|
||||
logger.error("SABnzbd mode=%s request failed: %s", mode, e)
|
||||
return None
|
||||
except ValueError as e:
|
||||
logger.error("SABnzbd mode=%s response was not JSON: %s", mode, e)
|
||||
return None
|
||||
|
||||
def _post_sync(self, mode: str, files=None, **extra) -> Optional[dict]:
|
||||
if not self.is_configured():
|
||||
return None
|
||||
params = {
|
||||
'mode': mode,
|
||||
'output': 'json',
|
||||
'apikey': self._api_key,
|
||||
}
|
||||
params.update(extra)
|
||||
try:
|
||||
resp = http_requests.post(f"{self._url}/api", params=params, files=files,
|
||||
timeout=self.DEFAULT_TIMEOUT)
|
||||
if not resp.ok:
|
||||
logger.warning("SABnzbd POST mode=%s returned HTTP %s", mode, resp.status_code)
|
||||
return None
|
||||
return resp.json()
|
||||
except http_requests.exceptions.RequestException as e:
|
||||
logger.error("SABnzbd POST mode=%s failed: %s", mode, e)
|
||||
return None
|
||||
except ValueError as e:
|
||||
logger.error("SABnzbd POST mode=%s response was not JSON: %s", mode, e)
|
||||
return None
|
||||
|
||||
async def add_nzb(
|
||||
self,
|
||||
url_or_bytes: Union[str, bytes],
|
||||
category: str = "soulsync",
|
||||
save_path: Optional[str] = None,
|
||||
) -> Optional[str]:
|
||||
loop = asyncio.get_event_loop()
|
||||
return await loop.run_in_executor(
|
||||
None, self._add_nzb_sync, url_or_bytes, category, save_path
|
||||
)
|
||||
|
||||
def _add_nzb_sync(
|
||||
self,
|
||||
url_or_bytes: Union[str, bytes],
|
||||
category: str,
|
||||
save_path: Optional[str],
|
||||
) -> Optional[str]:
|
||||
cat = category or self._category
|
||||
if isinstance(url_or_bytes, bytes):
|
||||
files = {'name': ('soulsync.nzb', url_or_bytes, 'application/x-nzb')}
|
||||
data = self._post_sync('addfile', files=files, cat=cat)
|
||||
else:
|
||||
data = self._call_sync('addurl', name=url_or_bytes, cat=cat)
|
||||
if not data or not data.get('status'):
|
||||
return None
|
||||
ids = data.get('nzo_ids') or []
|
||||
return ids[0] if ids else None
|
||||
|
||||
async def get_status(self, job_id: str) -> Optional[UsenetStatus]:
|
||||
loop = asyncio.get_event_loop()
|
||||
return await loop.run_in_executor(None, self._get_status_sync, job_id)
|
||||
|
||||
def _get_status_sync(self, job_id: str) -> Optional[UsenetStatus]:
|
||||
# Check active queue first; if not found, fall back to history.
|
||||
for status in self._get_all_sync():
|
||||
if status.id == job_id:
|
||||
return status
|
||||
return None
|
||||
|
||||
async def get_all(self) -> List[UsenetStatus]:
|
||||
loop = asyncio.get_event_loop()
|
||||
return await loop.run_in_executor(None, self._get_all_sync)
|
||||
|
||||
def _get_all_sync(self) -> List[UsenetStatus]:
|
||||
out: List[UsenetStatus] = []
|
||||
# Active queue
|
||||
queue = self._call_sync('queue')
|
||||
if queue and isinstance(queue.get('queue'), dict):
|
||||
for slot in queue['queue'].get('slots', []) or []:
|
||||
out.append(self._parse_queue_slot(slot))
|
||||
# History — completed / failed jobs SAB still tracks
|
||||
history = self._call_sync('history', limit=50)
|
||||
if history and isinstance(history.get('history'), dict):
|
||||
for slot in history['history'].get('slots', []) or []:
|
||||
out.append(self._parse_history_slot(slot))
|
||||
return out
|
||||
|
||||
def _parse_queue_slot(self, slot: dict) -> UsenetStatus:
|
||||
try:
|
||||
percentage = float(slot.get('percentage') or 0.0)
|
||||
except (TypeError, ValueError):
|
||||
percentage = 0.0
|
||||
progress = percentage / 100.0
|
||||
# mb / mbleft are strings of MB values in SAB's queue API.
|
||||
size_mb = self._safe_float(slot.get('mb'))
|
||||
left_mb = self._safe_float(slot.get('mbleft'))
|
||||
size_bytes = int(size_mb * 1024 * 1024) if size_mb else 0
|
||||
downloaded_bytes = int((size_mb - left_mb) * 1024 * 1024) if size_mb and left_mb is not None else 0
|
||||
# ``timeleft`` is HH:MM:SS — convert to seconds.
|
||||
eta = self._parse_timeleft(slot.get('timeleft'))
|
||||
return UsenetStatus(
|
||||
id=str(slot.get('nzo_id') or ''),
|
||||
name=slot.get('filename') or slot.get('name') or '',
|
||||
state=_map_state(slot.get('status') or ''),
|
||||
progress=max(0.0, min(progress, 1.0)),
|
||||
size=size_bytes,
|
||||
downloaded=max(0, downloaded_bytes),
|
||||
download_speed=0, # queue endpoint doesn't include per-slot speed
|
||||
eta=eta,
|
||||
category=slot.get('cat'),
|
||||
)
|
||||
|
||||
def _parse_history_slot(self, slot: dict) -> UsenetStatus:
|
||||
# History entries are post-download — progress is 1.0 unless failed.
|
||||
sab_state = (slot.get('status') or '').lower()
|
||||
is_failed = sab_state == 'failed'
|
||||
return UsenetStatus(
|
||||
id=str(slot.get('nzo_id') or ''),
|
||||
name=slot.get('name') or '',
|
||||
state='failed' if is_failed else 'completed',
|
||||
progress=0.0 if is_failed else 1.0,
|
||||
size=int(slot.get('bytes') or 0),
|
||||
downloaded=int(slot.get('bytes') or 0) if not is_failed else 0,
|
||||
download_speed=0,
|
||||
save_path=slot.get('storage') or slot.get('path'),
|
||||
category=slot.get('category'),
|
||||
error=slot.get('fail_message') if is_failed else None,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _safe_float(value) -> Optional[float]:
|
||||
if value is None or value == '':
|
||||
return None
|
||||
try:
|
||||
return float(value)
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _parse_timeleft(value) -> Optional[int]:
|
||||
if not value or not isinstance(value, str):
|
||||
return None
|
||||
parts = value.split(':')
|
||||
try:
|
||||
if len(parts) == 3:
|
||||
h, m, s = parts
|
||||
return int(h) * 3600 + int(m) * 60 + int(s)
|
||||
if len(parts) == 2:
|
||||
m, s = parts
|
||||
return int(m) * 60 + int(s)
|
||||
except ValueError:
|
||||
return None
|
||||
return None
|
||||
|
||||
async def remove(self, job_id: str, delete_files: bool = False) -> bool:
|
||||
loop = asyncio.get_event_loop()
|
||||
return await loop.run_in_executor(None, self._remove_sync, job_id, delete_files)
|
||||
|
||||
def _remove_sync(self, job_id: str, delete_files: bool) -> bool:
|
||||
# SAB deletes from queue or history depending on where the job is.
|
||||
# We try queue first; if SAB reports no-op, fall through to history.
|
||||
params = {'name': 'delete', 'value': job_id}
|
||||
if delete_files:
|
||||
params['del_files'] = 1
|
||||
data = self._call_sync('queue', **params)
|
||||
if data and data.get('status'):
|
||||
return True
|
||||
# History delete
|
||||
history_params = {'name': 'delete', 'value': job_id}
|
||||
if delete_files:
|
||||
history_params['del_files'] = 1
|
||||
data = self._call_sync('history', **history_params)
|
||||
return bool(data and data.get('status'))
|
||||
|
||||
async def pause(self, job_id: str) -> bool:
|
||||
loop = asyncio.get_event_loop()
|
||||
return await loop.run_in_executor(None, self._pause_sync, job_id)
|
||||
|
||||
def _pause_sync(self, job_id: str) -> bool:
|
||||
data = self._call_sync('queue', name='pause', value=job_id)
|
||||
return bool(data and data.get('status'))
|
||||
|
||||
async def resume(self, job_id: str) -> bool:
|
||||
loop = asyncio.get_event_loop()
|
||||
return await loop.run_in_executor(None, self._resume_sync, job_id)
|
||||
|
||||
def _resume_sync(self, job_id: str) -> bool:
|
||||
data = self._call_sync('queue', name='resume', value=job_id)
|
||||
return bool(data and data.get('status'))
|
||||
Loading…
Reference in new issue