diff --git a/core/downloads/cancel.py b/core/downloads/cancel.py new file mode 100644 index 00000000..6adecbc9 --- /dev/null +++ b/core/downloads/cancel.py @@ -0,0 +1,103 @@ +"""Download cancellation + clear helpers. + +Four discrete operations lifted from web_server.py: + +- `cancel_single_download(client, run_async, download_id, username)` — cancel + one slskd transfer. +- `cancel_all_active(client, run_async, sweep_callback)` — cancel every + active slskd transfer, then clear the now-cancelled ones, then sweep + empty download directories. +- `clear_finished_active(client, run_async, sweep_callback)` — clear all + terminal transfers from slskd (no cancel step), sweep dirs. +- `clear_completed_local()` — prune terminal-status tasks from the + local `download_tasks` tracker, drop empty batches, drop their locks. + Pure local mutation, doesn't touch slskd. + +The slskd-touching helpers take the soulseek client and run_async callback +explicitly; the local helper imports its globals directly from +`core.runtime_state` since those are module-level shared state and every +caller sees the same dict. + +Out of scope for this PR (deferred to the batch-lifecycle lift): +- `cancel_download_task` (calls _on_download_completed) +- `cancel_task_v2` + `_atomic_cancel_task` (manipulate batch active_count) +""" + +from __future__ import annotations + +import logging +from typing import Callable + +from core.runtime_state import ( + batch_locks, + download_batches, + download_tasks, + tasks_lock, +) + +logger = logging.getLogger(__name__) + +_TERMINAL_STATUSES = { + 'completed', 'failed', 'not_found', 'cancelled', 'skipped', 'already_owned', +} + + +def cancel_single_download(soulseek_client, run_async: Callable, + download_id: str, username: str) -> bool: + """Cancel one specific slskd download (with `remove=True`).""" + return run_async(soulseek_client.cancel_download(download_id, username, remove=True)) + + +def cancel_all_active(soulseek_client, run_async: Callable, + sweep_callback: Callable[[], None]) -> tuple[bool, str]: + """Cancel every active slskd download, clear the resulting ones, sweep dirs. + + Returns `(success, message)` so the route can map to the right HTTP shape. + """ + cancel_success = run_async(soulseek_client.cancel_all_downloads()) + if not cancel_success: + return False, "Failed to cancel active downloads." + + run_async(soulseek_client.clear_all_completed_downloads()) + sweep_callback() + return True, "All downloads cancelled and cleared." + + +def clear_finished_active(soulseek_client, run_async: Callable, + sweep_callback: Callable[[], None]) -> bool: + """Clear all terminal transfers from slskd, sweep dirs on success.""" + success = run_async(soulseek_client.clear_all_completed_downloads()) + if success: + sweep_callback() + return success + + +def clear_completed_local() -> int: + """Remove completed/failed/cancelled tasks from the local tracker. + + Also prunes batches whose queues are now empty, and removes the matching + `batch_locks` entry. Returns the number of cleared tasks. + """ + cleared = 0 + with tasks_lock: + task_ids_to_remove = [ + tid for tid, task in download_tasks.items() + if task.get('status') in _TERMINAL_STATUSES + ] + for tid in task_ids_to_remove: + del download_tasks[tid] + cleared += 1 + + empty_batches = [] + for bid, batch in download_batches.items(): + remaining = [t for t in batch.get('queue', []) if t in download_tasks] + if not remaining: + empty_batches.append(bid) + else: + batch['queue'] = remaining + for bid in empty_batches: + del download_batches[bid] + if bid in batch_locks: + del batch_locks[bid] + + return cleared diff --git a/core/runtime_state.py b/core/runtime_state.py index f562ad5f..f952aee1 100644 --- a/core/runtime_state.py +++ b/core/runtime_state.py @@ -12,6 +12,7 @@ matched_downloads_context: Dict[str, Dict[str, Any]] = {} tasks_lock = threading.Lock() download_tasks: Dict[str, Dict[str, Any]] = {} download_batches: Dict[str, Dict[str, Any]] = {} +batch_locks: Dict[str, threading.Lock] = {} processed_download_ids = set() post_process_locks: Dict[str, threading.Lock] = {} post_process_locks_lock = threading.Lock() diff --git a/tests/downloads/test_downloads_cancel.py b/tests/downloads/test_downloads_cancel.py new file mode 100644 index 00000000..7d90f0ee --- /dev/null +++ b/tests/downloads/test_downloads_cancel.py @@ -0,0 +1,211 @@ +"""Tests for core/downloads/cancel.py — slskd cancel + clear + local task pruning.""" + +from __future__ import annotations + +import pytest + +from core.downloads import cancel +from core.runtime_state import ( + batch_locks, + download_batches, + download_tasks, +) + + +@pytest.fixture(autouse=True) +def reset_state(): + """Each test gets clean download_tasks / download_batches / batch_locks.""" + download_tasks.clear() + download_batches.clear() + batch_locks.clear() + yield + download_tasks.clear() + download_batches.clear() + batch_locks.clear() + + +# --------------------------------------------------------------------------- +# Fakes +# --------------------------------------------------------------------------- + +class _FakeSoulseek: + def __init__(self, cancel_result=True, cancel_all_result=True, clear_result=True): + self._cancel_result = cancel_result + self._cancel_all_result = cancel_all_result + self._clear_result = clear_result + self.cancel_calls = [] + self.cancel_all_calls = 0 + self.clear_calls = 0 + + async def cancel_download(self, download_id, username, remove=False): + self.cancel_calls.append((download_id, username, remove)) + return self._cancel_result + + async def cancel_all_downloads(self): + self.cancel_all_calls += 1 + return self._cancel_all_result + + async def clear_all_completed_downloads(self): + self.clear_calls += 1 + return self._clear_result + + +def _sync_run_async(coro): + """Drain a coroutine on a fresh loop.""" + import asyncio + loop = asyncio.new_event_loop() + try: + return loop.run_until_complete(coro) + finally: + loop.close() + + +# --------------------------------------------------------------------------- +# cancel_single_download +# --------------------------------------------------------------------------- + +def test_cancel_single_passes_args_with_remove_true(): + sk = _FakeSoulseek() + result = cancel.cancel_single_download(sk, _sync_run_async, 'dl-123', 'user-x') + assert result is True + assert sk.cancel_calls == [('dl-123', 'user-x', True)] + + +def test_cancel_single_propagates_failure(): + sk = _FakeSoulseek(cancel_result=False) + result = cancel.cancel_single_download(sk, _sync_run_async, 'dl', 'u') + assert result is False + + +# --------------------------------------------------------------------------- +# cancel_all_active +# --------------------------------------------------------------------------- + +def test_cancel_all_happy_path(): + sk = _FakeSoulseek() + sweeps = [] + success, msg = cancel.cancel_all_active(sk, _sync_run_async, lambda: sweeps.append(1)) + assert success is True + assert msg == "All downloads cancelled and cleared." + assert sk.cancel_all_calls == 1 + assert sk.clear_calls == 1 + assert sweeps == [1] + + +def test_cancel_all_returns_failure_if_cancel_step_fails(): + sk = _FakeSoulseek(cancel_all_result=False) + sweeps = [] + success, msg = cancel.cancel_all_active(sk, _sync_run_async, lambda: sweeps.append(1)) + assert success is False + assert msg == "Failed to cancel active downloads." + # Clear/sweep should NOT run when cancel fails + assert sk.clear_calls == 0 + assert sweeps == [] + + +def test_cancel_all_runs_sweep_even_if_clear_returns_false(): + """Clear returning False is not a hard error — sweep still runs (matches original).""" + sk = _FakeSoulseek(clear_result=False) + sweeps = [] + success, msg = cancel.cancel_all_active(sk, _sync_run_async, lambda: sweeps.append(1)) + assert success is True + assert sweeps == [1] + + +# --------------------------------------------------------------------------- +# clear_finished_active +# --------------------------------------------------------------------------- + +def test_clear_finished_happy_path_calls_sweep(): + sk = _FakeSoulseek() + sweeps = [] + success = cancel.clear_finished_active(sk, _sync_run_async, lambda: sweeps.append(1)) + assert success is True + assert sk.clear_calls == 1 + assert sweeps == [1] + + +def test_clear_finished_failure_skips_sweep(): + sk = _FakeSoulseek(clear_result=False) + sweeps = [] + success = cancel.clear_finished_active(sk, _sync_run_async, lambda: sweeps.append(1)) + assert success is False + assert sweeps == [] + + +# --------------------------------------------------------------------------- +# clear_completed_local +# --------------------------------------------------------------------------- + +def test_clear_completed_removes_terminal_tasks(): + download_tasks['t1'] = {'status': 'completed'} + download_tasks['t2'] = {'status': 'failed'} + download_tasks['t3'] = {'status': 'downloading'} # still active + download_tasks['t4'] = {'status': 'cancelled'} + download_tasks['t5'] = {'status': 'not_found'} + download_tasks['t6'] = {'status': 'skipped'} + download_tasks['t7'] = {'status': 'already_owned'} + + cleared = cancel.clear_completed_local() + assert cleared == 6 + assert set(download_tasks.keys()) == {'t3'} + + +def test_clear_completed_keeps_searching_and_queued(): + """Active states ('searching', 'queued', 'downloading', 'pending') stay.""" + download_tasks['t1'] = {'status': 'searching'} + download_tasks['t2'] = {'status': 'queued'} + download_tasks['t3'] = {'status': 'downloading'} + download_tasks['t4'] = {'status': 'pending'} + cleared = cancel.clear_completed_local() + assert cleared == 0 + assert set(download_tasks.keys()) == {'t1', 't2', 't3', 't4'} + + +def test_clear_completed_drops_empty_batches(): + download_tasks['t1'] = {'status': 'completed'} + download_batches['b1'] = {'queue': ['t1']} # all tasks will be cleared + download_batches['b2'] = {'queue': ['t2']} # t2 doesn't exist either + download_tasks['t3'] = {'status': 'downloading'} + download_batches['b3'] = {'queue': ['t3']} # t3 stays + + cancel.clear_completed_local() + assert 'b1' not in download_batches + assert 'b2' not in download_batches + assert 'b3' in download_batches + assert download_batches['b3']['queue'] == ['t3'] + + +def test_clear_completed_prunes_terminal_task_ids_from_batch_queues(): + """Batch with mix of terminal + active tasks gets queue trimmed, not deleted.""" + download_tasks['t1'] = {'status': 'completed'} + download_tasks['t2'] = {'status': 'downloading'} + download_batches['b1'] = {'queue': ['t1', 't2']} + + cancel.clear_completed_local() + assert 'b1' in download_batches + assert download_batches['b1']['queue'] == ['t2'] + + +def test_clear_completed_drops_batch_locks_for_deleted_batches(): + import threading + download_tasks['t1'] = {'status': 'completed'} + download_batches['b1'] = {'queue': ['t1']} + batch_locks['b1'] = threading.Lock() + + cancel.clear_completed_local() + assert 'b1' not in batch_locks + + +def test_clear_completed_keeps_batch_locks_for_surviving_batches(): + import threading + download_tasks['t1'] = {'status': 'downloading'} + download_batches['b1'] = {'queue': ['t1']} + batch_locks['b1'] = threading.Lock() + + cancel.clear_completed_local() + assert 'b1' in batch_locks + + +def test_clear_completed_returns_zero_on_empty_state(): + assert cancel.clear_completed_local() == 0 diff --git a/web_server.py b/web_server.py index 42e7db91..3c5f89e0 100644 --- a/web_server.py +++ b/web_server.py @@ -794,7 +794,9 @@ transfer_data_cache = { session_completed_downloads = 0 session_stats_lock = threading.Lock() -batch_locks = {} +# `batch_locks` lives in core/runtime_state.py (re-exported here so the existing +# call sites resolve without modification). All download globals share that home. +from core.runtime_state import batch_locks _orphaned_download_keys = set() _enrichment_activity_log = {} @@ -9353,15 +9355,17 @@ def get_download_status(): +# Cancel + clear logic lives in core/downloads/cancel.py — these routes are thin handlers. +from core.downloads import cancel as _downloads_cancel + + @app.route('/api/downloads/cancel', methods=['POST']) def cancel_download(): - """ - Cancel a specific download transfer, matching GUI functionality. - """ + """Cancel a specific download transfer, matching GUI functionality.""" data = request.get_json() if not data: return jsonify({"success": False, "error": "No data provided."}), 400 - + download_id = data.get('download_id') username = data.get('username') @@ -9369,52 +9373,40 @@ def cancel_download(): return jsonify({"success": False, "error": "Missing download_id or username."}), 400 try: - # Call the same client method the GUI uses - success = run_async(soulseek_client.cancel_download(download_id, username, remove=True)) + success = _downloads_cancel.cancel_single_download(soulseek_client, run_async, download_id, username) if success: return jsonify({"success": True, "message": "Download cancelled."}) - else: - return jsonify({"success": False, "error": "Failed to cancel download via slskd."}), 500 + return jsonify({"success": False, "error": "Failed to cancel download via slskd."}), 500 except Exception as e: logger.error(f"Error cancelling download: {e}") return jsonify({"success": False, "error": str(e)}), 500 + @app.route('/api/downloads/cancel-all', methods=['POST']) def cancel_all_downloads(): - """ - Cancel all active downloads from slskd, then clear completed ones. - """ + """Cancel all active downloads from slskd, then clear completed ones.""" try: - # First cancel all active downloads - cancel_success = run_async(soulseek_client.cancel_all_downloads()) - if not cancel_success: - return jsonify({"success": False, "error": "Failed to cancel active downloads."}), 500 - - # Then clear the now-cancelled/completed downloads - clear_success = run_async(soulseek_client.clear_all_completed_downloads()) - - # Sweep empty directories - _sweep_empty_download_directories() - - return jsonify({"success": True, "message": "All downloads cancelled and cleared."}) + success, msg = _downloads_cancel.cancel_all_active( + soulseek_client, run_async, _sweep_empty_download_directories, + ) + if success: + return jsonify({"success": True, "message": msg}) + return jsonify({"success": False, "error": msg}), 500 except Exception as e: logger.error(f"Error cancelling all downloads: {e}") return jsonify({"success": False, "error": str(e)}), 500 + @app.route('/api/downloads/clear-finished', methods=['POST']) def clear_finished_downloads(): - """ - Clear all terminal (completed, cancelled, failed) downloads from slskd. - """ + """Clear all terminal (completed, cancelled, failed) downloads from slskd.""" try: - # This single client call handles clearing everything that is no longer active - success = run_async(soulseek_client.clear_all_completed_downloads()) + success = _downloads_cancel.clear_finished_active( + soulseek_client, run_async, _sweep_empty_download_directories, + ) if success: - # Also sweep empty directories left behind by completed downloads - _sweep_empty_download_directories() return jsonify({"success": True, "message": "Finished downloads cleared."}) - else: - return jsonify({"success": False, "error": "Backend failed to clear downloads."}), 500 + return jsonify({"success": False, "error": "Backend failed to clear downloads."}), 500 except Exception as e: logger.error(f"Error clearing finished downloads: {e}") return jsonify({"success": False, "error": str(e)}), 500 @@ -24646,29 +24638,7 @@ def get_batch_history(): def clear_completed_downloads(): """Remove completed/failed/cancelled tasks from the download tracker.""" try: - terminal_statuses = {'completed', 'failed', 'not_found', 'cancelled', 'skipped', 'already_owned'} - cleared = 0 - with tasks_lock: - task_ids_to_remove = [ - tid for tid, task in download_tasks.items() - if task.get('status') in terminal_statuses - ] - for tid in task_ids_to_remove: - del download_tasks[tid] - cleared += 1 - # Also clean up empty batches - empty_batches = [] - for bid, batch in download_batches.items(): - remaining = [t for t in batch.get('queue', []) if t in download_tasks] - if not remaining: - empty_batches.append(bid) - else: - batch['queue'] = remaining - for bid in empty_batches: - del download_batches[bid] - if bid in batch_locks: - del batch_locks[bid] - + cleared = _downloads_cancel.clear_completed_local() return jsonify({'success': True, 'cleared': cleared}) except Exception as e: logger.error(f"Error clearing completed downloads: {e}")