Merge pull request #395 from Nezreka/refactor/lift-downloads-cancel

PR4b: lift cancel + clear download routes to core/downloads/cancel.py
pull/396/head
BoulderBadgeDad 4 weeks ago committed by GitHub
commit b02a06782e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,103 @@
"""Download cancellation + clear helpers.
Four discrete operations lifted from web_server.py:
- `cancel_single_download(client, run_async, download_id, username)` cancel
one slskd transfer.
- `cancel_all_active(client, run_async, sweep_callback)` cancel every
active slskd transfer, then clear the now-cancelled ones, then sweep
empty download directories.
- `clear_finished_active(client, run_async, sweep_callback)` clear all
terminal transfers from slskd (no cancel step), sweep dirs.
- `clear_completed_local()` prune terminal-status tasks from the
local `download_tasks` tracker, drop empty batches, drop their locks.
Pure local mutation, doesn't touch slskd.
The slskd-touching helpers take the soulseek client and run_async callback
explicitly; the local helper imports its globals directly from
`core.runtime_state` since those are module-level shared state and every
caller sees the same dict.
Out of scope for this PR (deferred to the batch-lifecycle lift):
- `cancel_download_task` (calls _on_download_completed)
- `cancel_task_v2` + `_atomic_cancel_task` (manipulate batch active_count)
"""
from __future__ import annotations
import logging
from typing import Callable
from core.runtime_state import (
batch_locks,
download_batches,
download_tasks,
tasks_lock,
)
logger = logging.getLogger(__name__)
_TERMINAL_STATUSES = {
'completed', 'failed', 'not_found', 'cancelled', 'skipped', 'already_owned',
}
def cancel_single_download(soulseek_client, run_async: Callable,
download_id: str, username: str) -> bool:
"""Cancel one specific slskd download (with `remove=True`)."""
return run_async(soulseek_client.cancel_download(download_id, username, remove=True))
def cancel_all_active(soulseek_client, run_async: Callable,
sweep_callback: Callable[[], None]) -> tuple[bool, str]:
"""Cancel every active slskd download, clear the resulting ones, sweep dirs.
Returns `(success, message)` so the route can map to the right HTTP shape.
"""
cancel_success = run_async(soulseek_client.cancel_all_downloads())
if not cancel_success:
return False, "Failed to cancel active downloads."
run_async(soulseek_client.clear_all_completed_downloads())
sweep_callback()
return True, "All downloads cancelled and cleared."
def clear_finished_active(soulseek_client, run_async: Callable,
sweep_callback: Callable[[], None]) -> bool:
"""Clear all terminal transfers from slskd, sweep dirs on success."""
success = run_async(soulseek_client.clear_all_completed_downloads())
if success:
sweep_callback()
return success
def clear_completed_local() -> int:
"""Remove completed/failed/cancelled tasks from the local tracker.
Also prunes batches whose queues are now empty, and removes the matching
`batch_locks` entry. Returns the number of cleared tasks.
"""
cleared = 0
with tasks_lock:
task_ids_to_remove = [
tid for tid, task in download_tasks.items()
if task.get('status') in _TERMINAL_STATUSES
]
for tid in task_ids_to_remove:
del download_tasks[tid]
cleared += 1
empty_batches = []
for bid, batch in download_batches.items():
remaining = [t for t in batch.get('queue', []) if t in download_tasks]
if not remaining:
empty_batches.append(bid)
else:
batch['queue'] = remaining
for bid in empty_batches:
del download_batches[bid]
if bid in batch_locks:
del batch_locks[bid]
return cleared

@ -12,6 +12,7 @@ matched_downloads_context: Dict[str, Dict[str, Any]] = {}
tasks_lock = threading.Lock()
download_tasks: Dict[str, Dict[str, Any]] = {}
download_batches: Dict[str, Dict[str, Any]] = {}
batch_locks: Dict[str, threading.Lock] = {}
processed_download_ids = set()
post_process_locks: Dict[str, threading.Lock] = {}
post_process_locks_lock = threading.Lock()

@ -0,0 +1,211 @@
"""Tests for core/downloads/cancel.py — slskd cancel + clear + local task pruning."""
from __future__ import annotations
import pytest
from core.downloads import cancel
from core.runtime_state import (
batch_locks,
download_batches,
download_tasks,
)
@pytest.fixture(autouse=True)
def reset_state():
"""Each test gets clean download_tasks / download_batches / batch_locks."""
download_tasks.clear()
download_batches.clear()
batch_locks.clear()
yield
download_tasks.clear()
download_batches.clear()
batch_locks.clear()
# ---------------------------------------------------------------------------
# Fakes
# ---------------------------------------------------------------------------
class _FakeSoulseek:
def __init__(self, cancel_result=True, cancel_all_result=True, clear_result=True):
self._cancel_result = cancel_result
self._cancel_all_result = cancel_all_result
self._clear_result = clear_result
self.cancel_calls = []
self.cancel_all_calls = 0
self.clear_calls = 0
async def cancel_download(self, download_id, username, remove=False):
self.cancel_calls.append((download_id, username, remove))
return self._cancel_result
async def cancel_all_downloads(self):
self.cancel_all_calls += 1
return self._cancel_all_result
async def clear_all_completed_downloads(self):
self.clear_calls += 1
return self._clear_result
def _sync_run_async(coro):
"""Drain a coroutine on a fresh loop."""
import asyncio
loop = asyncio.new_event_loop()
try:
return loop.run_until_complete(coro)
finally:
loop.close()
# ---------------------------------------------------------------------------
# cancel_single_download
# ---------------------------------------------------------------------------
def test_cancel_single_passes_args_with_remove_true():
sk = _FakeSoulseek()
result = cancel.cancel_single_download(sk, _sync_run_async, 'dl-123', 'user-x')
assert result is True
assert sk.cancel_calls == [('dl-123', 'user-x', True)]
def test_cancel_single_propagates_failure():
sk = _FakeSoulseek(cancel_result=False)
result = cancel.cancel_single_download(sk, _sync_run_async, 'dl', 'u')
assert result is False
# ---------------------------------------------------------------------------
# cancel_all_active
# ---------------------------------------------------------------------------
def test_cancel_all_happy_path():
sk = _FakeSoulseek()
sweeps = []
success, msg = cancel.cancel_all_active(sk, _sync_run_async, lambda: sweeps.append(1))
assert success is True
assert msg == "All downloads cancelled and cleared."
assert sk.cancel_all_calls == 1
assert sk.clear_calls == 1
assert sweeps == [1]
def test_cancel_all_returns_failure_if_cancel_step_fails():
sk = _FakeSoulseek(cancel_all_result=False)
sweeps = []
success, msg = cancel.cancel_all_active(sk, _sync_run_async, lambda: sweeps.append(1))
assert success is False
assert msg == "Failed to cancel active downloads."
# Clear/sweep should NOT run when cancel fails
assert sk.clear_calls == 0
assert sweeps == []
def test_cancel_all_runs_sweep_even_if_clear_returns_false():
"""Clear returning False is not a hard error — sweep still runs (matches original)."""
sk = _FakeSoulseek(clear_result=False)
sweeps = []
success, msg = cancel.cancel_all_active(sk, _sync_run_async, lambda: sweeps.append(1))
assert success is True
assert sweeps == [1]
# ---------------------------------------------------------------------------
# clear_finished_active
# ---------------------------------------------------------------------------
def test_clear_finished_happy_path_calls_sweep():
sk = _FakeSoulseek()
sweeps = []
success = cancel.clear_finished_active(sk, _sync_run_async, lambda: sweeps.append(1))
assert success is True
assert sk.clear_calls == 1
assert sweeps == [1]
def test_clear_finished_failure_skips_sweep():
sk = _FakeSoulseek(clear_result=False)
sweeps = []
success = cancel.clear_finished_active(sk, _sync_run_async, lambda: sweeps.append(1))
assert success is False
assert sweeps == []
# ---------------------------------------------------------------------------
# clear_completed_local
# ---------------------------------------------------------------------------
def test_clear_completed_removes_terminal_tasks():
download_tasks['t1'] = {'status': 'completed'}
download_tasks['t2'] = {'status': 'failed'}
download_tasks['t3'] = {'status': 'downloading'} # still active
download_tasks['t4'] = {'status': 'cancelled'}
download_tasks['t5'] = {'status': 'not_found'}
download_tasks['t6'] = {'status': 'skipped'}
download_tasks['t7'] = {'status': 'already_owned'}
cleared = cancel.clear_completed_local()
assert cleared == 6
assert set(download_tasks.keys()) == {'t3'}
def test_clear_completed_keeps_searching_and_queued():
"""Active states ('searching', 'queued', 'downloading', 'pending') stay."""
download_tasks['t1'] = {'status': 'searching'}
download_tasks['t2'] = {'status': 'queued'}
download_tasks['t3'] = {'status': 'downloading'}
download_tasks['t4'] = {'status': 'pending'}
cleared = cancel.clear_completed_local()
assert cleared == 0
assert set(download_tasks.keys()) == {'t1', 't2', 't3', 't4'}
def test_clear_completed_drops_empty_batches():
download_tasks['t1'] = {'status': 'completed'}
download_batches['b1'] = {'queue': ['t1']} # all tasks will be cleared
download_batches['b2'] = {'queue': ['t2']} # t2 doesn't exist either
download_tasks['t3'] = {'status': 'downloading'}
download_batches['b3'] = {'queue': ['t3']} # t3 stays
cancel.clear_completed_local()
assert 'b1' not in download_batches
assert 'b2' not in download_batches
assert 'b3' in download_batches
assert download_batches['b3']['queue'] == ['t3']
def test_clear_completed_prunes_terminal_task_ids_from_batch_queues():
"""Batch with mix of terminal + active tasks gets queue trimmed, not deleted."""
download_tasks['t1'] = {'status': 'completed'}
download_tasks['t2'] = {'status': 'downloading'}
download_batches['b1'] = {'queue': ['t1', 't2']}
cancel.clear_completed_local()
assert 'b1' in download_batches
assert download_batches['b1']['queue'] == ['t2']
def test_clear_completed_drops_batch_locks_for_deleted_batches():
import threading
download_tasks['t1'] = {'status': 'completed'}
download_batches['b1'] = {'queue': ['t1']}
batch_locks['b1'] = threading.Lock()
cancel.clear_completed_local()
assert 'b1' not in batch_locks
def test_clear_completed_keeps_batch_locks_for_surviving_batches():
import threading
download_tasks['t1'] = {'status': 'downloading'}
download_batches['b1'] = {'queue': ['t1']}
batch_locks['b1'] = threading.Lock()
cancel.clear_completed_local()
assert 'b1' in batch_locks
def test_clear_completed_returns_zero_on_empty_state():
assert cancel.clear_completed_local() == 0

@ -794,7 +794,9 @@ transfer_data_cache = {
session_completed_downloads = 0
session_stats_lock = threading.Lock()
batch_locks = {}
# `batch_locks` lives in core/runtime_state.py (re-exported here so the existing
# call sites resolve without modification). All download globals share that home.
from core.runtime_state import batch_locks
_orphaned_download_keys = set()
_enrichment_activity_log = {}
@ -9353,15 +9355,17 @@ def get_download_status():
# Cancel + clear logic lives in core/downloads/cancel.py — these routes are thin handlers.
from core.downloads import cancel as _downloads_cancel
@app.route('/api/downloads/cancel', methods=['POST'])
def cancel_download():
"""
Cancel a specific download transfer, matching GUI functionality.
"""
"""Cancel a specific download transfer, matching GUI functionality."""
data = request.get_json()
if not data:
return jsonify({"success": False, "error": "No data provided."}), 400
download_id = data.get('download_id')
username = data.get('username')
@ -9369,52 +9373,40 @@ def cancel_download():
return jsonify({"success": False, "error": "Missing download_id or username."}), 400
try:
# Call the same client method the GUI uses
success = run_async(soulseek_client.cancel_download(download_id, username, remove=True))
success = _downloads_cancel.cancel_single_download(soulseek_client, run_async, download_id, username)
if success:
return jsonify({"success": True, "message": "Download cancelled."})
else:
return jsonify({"success": False, "error": "Failed to cancel download via slskd."}), 500
return jsonify({"success": False, "error": "Failed to cancel download via slskd."}), 500
except Exception as e:
logger.error(f"Error cancelling download: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/downloads/cancel-all', methods=['POST'])
def cancel_all_downloads():
"""
Cancel all active downloads from slskd, then clear completed ones.
"""
"""Cancel all active downloads from slskd, then clear completed ones."""
try:
# First cancel all active downloads
cancel_success = run_async(soulseek_client.cancel_all_downloads())
if not cancel_success:
return jsonify({"success": False, "error": "Failed to cancel active downloads."}), 500
# Then clear the now-cancelled/completed downloads
clear_success = run_async(soulseek_client.clear_all_completed_downloads())
# Sweep empty directories
_sweep_empty_download_directories()
return jsonify({"success": True, "message": "All downloads cancelled and cleared."})
success, msg = _downloads_cancel.cancel_all_active(
soulseek_client, run_async, _sweep_empty_download_directories,
)
if success:
return jsonify({"success": True, "message": msg})
return jsonify({"success": False, "error": msg}), 500
except Exception as e:
logger.error(f"Error cancelling all downloads: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/downloads/clear-finished', methods=['POST'])
def clear_finished_downloads():
"""
Clear all terminal (completed, cancelled, failed) downloads from slskd.
"""
"""Clear all terminal (completed, cancelled, failed) downloads from slskd."""
try:
# This single client call handles clearing everything that is no longer active
success = run_async(soulseek_client.clear_all_completed_downloads())
success = _downloads_cancel.clear_finished_active(
soulseek_client, run_async, _sweep_empty_download_directories,
)
if success:
# Also sweep empty directories left behind by completed downloads
_sweep_empty_download_directories()
return jsonify({"success": True, "message": "Finished downloads cleared."})
else:
return jsonify({"success": False, "error": "Backend failed to clear downloads."}), 500
return jsonify({"success": False, "error": "Backend failed to clear downloads."}), 500
except Exception as e:
logger.error(f"Error clearing finished downloads: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@ -24646,29 +24638,7 @@ def get_batch_history():
def clear_completed_downloads():
"""Remove completed/failed/cancelled tasks from the download tracker."""
try:
terminal_statuses = {'completed', 'failed', 'not_found', 'cancelled', 'skipped', 'already_owned'}
cleared = 0
with tasks_lock:
task_ids_to_remove = [
tid for tid, task in download_tasks.items()
if task.get('status') in terminal_statuses
]
for tid in task_ids_to_remove:
del download_tasks[tid]
cleared += 1
# Also clean up empty batches
empty_batches = []
for bid, batch in download_batches.items():
remaining = [t for t in batch.get('queue', []) if t in download_tasks]
if not remaining:
empty_batches.append(bid)
else:
batch['queue'] = remaining
for bid in empty_batches:
del download_batches[bid]
if bid in batch_locks:
del batch_locks[bid]
cleared = _downloads_cancel.clear_completed_local()
return jsonify({'success': True, 'cleared': cleared})
except Exception as e:
logger.error(f"Error clearing completed downloads: {e}")

Loading…
Cancel
Save