From 4ddfb01a0aedeec34353f3fd36248711a3950c47 Mon Sep 17 00:00:00 2001 From: Broque Thomas <26755000+Nezreka@users.noreply.github.com> Date: Mon, 4 May 2026 13:38:18 -0700 Subject: [PATCH] C2: Migrate YouTube to engine.worker MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit YouTubeClient drops its hand-rolled background thread + state dict + semaphore + last-download-timestamp. download() now delegates to engine.worker.dispatch with _download_sync as the impl callable; YouTube-specific record fields (video_id, url, title) merge into the engine record via extra_record_fields. Engine wires itself in via plugin.set_engine(engine) callback on register_plugin. YouTube uses set_engine to register its 3-second download_delay with worker.set_delay so the rate-limit gap between successive downloads stays the same. Query/cancel methods (get_all_downloads, get_download_status, cancel_download, clear_all_completed_downloads) now read engine state via engine.iter_records_for_source / get_record / update_record / remove_record. Net: ~120 LOC of thread+state boilerplate removed from youtube_client.py. Phase A pinning tests updated to assert engine state instead of client.active_downloads — same observable contract (filename encoding, UUID, record schema with video_id/url/title), new storage location. Suite still green (2025 passed). Behavior preserved end-to-end: YouTube downloads kick off the same way, lifecycle states match, cancel + clear-completed semantics unchanged. --- core/download_engine/engine.py | 18 +- core/youtube_client.py | 342 ++++++++---------------- tests/downloads/test_youtube_pinning.py | 245 ++++++++++------- 3 files changed, 269 insertions(+), 336 deletions(-) diff --git a/core/download_engine/engine.py b/core/download_engine/engine.py index c9484f16..0deec668 100644 --- a/core/download_engine/engine.py +++ b/core/download_engine/engine.py @@ -85,14 +85,24 @@ class DownloadEngine: once per source by the orchestrator after the registry's ``initialize`` builds the client instances. - Phase B is purely informational — the engine doesn't yet - dispatch through plugins. Subsequent phases use these - references to call ``plugin._download_impl`` / - ``plugin._search_raw`` etc. + If the plugin exposes ``set_engine(engine)``, the engine + passes a self-reference so the plugin can dispatch into + ``engine.worker`` / read state / etc. Plugins that haven't + been migrated to the engine yet simply don't define + ``set_engine`` — they keep their pre-engine behavior + unchanged. """ if source_name in self._plugins: logger.warning("Plugin %s already registered with engine — overwriting", source_name) self._plugins[source_name] = plugin + set_engine = getattr(plugin, 'set_engine', None) + if callable(set_engine): + try: + set_engine(self) + except Exception as exc: + logger.warning( + "Plugin %s set_engine callback failed: %s", source_name, exc, + ) def get_plugin(self, source_name: str) -> Optional[Any]: return self._plugins.get(source_name) diff --git a/core/youtube_client.py b/core/youtube_client.py index 55e042ac..dd128f82 100644 --- a/core/youtube_client.py +++ b/core/youtube_client.py @@ -112,10 +112,25 @@ class YouTubeClient: # Callback for shutdown check (avoids circular imports) self.shutdown_check = None - # Rate limiting — serialize YouTube downloads with delay - self._download_semaphore = threading.Semaphore(1) + # Rate-limit policy — applied to engine.worker once the engine + # is wired in via set_engine(). Kept as an attribute for + # backward-compat external readers + so settings reload can + # update it without touching the engine. self._download_delay = config_manager.get('youtube.download_delay', 3) - self._last_download_time = 0 + + # Engine reference is populated by set_engine() at registration + # time. Until then the client can't dispatch downloads — but + # in production the orchestrator wires the engine immediately + # after constructing the registry, so this is only None in + # tests that bypass the orchestrator. + self._engine = None + + def set_engine(self, engine): + """Engine callback — gives the client access to the central + thread worker + state store. Engine calls this during + ``register_plugin`` if the plugin defines it.""" + self._engine = engine + engine.worker.set_delay('youtube', float(self._download_delay)) def set_shutdown_check(self, check_callable): """Set a callback function to check for system shutdown""" @@ -130,11 +145,6 @@ class YouTubeClient: logger.error("ffmpeg is required but not found") logger.error("The client will attempt to auto-download ffmpeg on first use") - # Download queue management (mirrors Soulseek's download tracking) - # Maps download_id -> download_info dict - self.active_downloads: Dict[str, Dict[str, Any]] = {} - self._download_lock = threading.Lock() # Use threading.Lock for thread safety - # Configure yt-dlp options with bot detection bypass self.download_opts = { 'format': 'bestaudio/best', @@ -859,137 +869,54 @@ class YouTubeClient: return matches async def download(self, username: str, filename: str, file_size: int = 0) -> Optional[str]: - """ - Download YouTube video as audio (async, Soulseek-compatible interface). + """Download YouTube video as audio. - Returns download_id immediately and runs download in background thread. - Monitor via get_download_status() or get_all_downloads(). + Returns download_id immediately; the actual download runs in + a background thread spawned by ``engine.worker``. Monitor + via ``orchestrator.get_download_status(download_id)``. Args: username: Ignored for YouTube (always "youtube") filename: Encoded as "video_id||title" from search results file_size: Ignored for YouTube (kept for interface compatibility) - - Returns: - download_id: Unique ID for tracking this download """ - try: - # Parse filename to extract video_id - if '||' not in filename: - logger.error(f"Invalid filename format: {filename}") - return None - - video_id, title = filename.split('||', 1) - youtube_url = f"https://www.youtube.com/watch?v={video_id}" - - logger.info(f"Starting YouTube download: {title}") - logger.info(f" URL: {youtube_url}") - - # Create unique download ID - download_id = str(uuid.uuid4()) - - # Initialize download info in active downloads - with self._download_lock: - self.active_downloads[download_id] = { - 'id': download_id, - 'filename': filename, # Keep original encoded format for context matching! - 'username': 'youtube', - 'state': 'Initializing', # Soulseek-style states - 'progress': 0.0, - 'size': file_size or 0, - 'transferred': 0, - 'speed': 0, - 'time_remaining': None, - 'video_id': video_id, - 'url': youtube_url, - 'title': title, - 'file_path': None, # Will be set when download completes - } - - # Start download in background thread (returns immediately) - download_thread = threading.Thread( - target=self._download_thread_worker, - args=(download_id, youtube_url, title, filename), - daemon=True - ) - download_thread.start() - - logger.info(f"YouTube download {download_id} started in background") - return download_id - - except Exception as e: - logger.error(f"Failed to start YouTube download: {e}") - import traceback - traceback.print_exc() + if '||' not in filename: + logger.error(f"Invalid filename format: {filename}") + return None + if self._engine is None: + logger.error("YouTube client has no engine reference — cannot dispatch download") return None - def _download_thread_worker(self, download_id: str, youtube_url: str, title: str, original_filename: str): - """ - Background thread worker for downloading YouTube videos. - Updates active_downloads dict with progress. - Serialized via semaphore with configurable delay between downloads. - """ - try: - with self._download_semaphore: - # Enforce delay since last download completed - elapsed = time.time() - self._last_download_time - if self._last_download_time > 0 and elapsed < self._download_delay: - wait_time = self._download_delay - elapsed - logger.info(f"Rate limiting: waiting {wait_time:.1f}s before next YouTube download") - time.sleep(wait_time) - - # Update state to downloading - with self._download_lock: - if download_id in self.active_downloads: - self.active_downloads[download_id]['state'] = 'InProgress, Downloading' # Match Soulseek state - - # Set current download ID for progress hook - self.current_download_id = download_id - - # Perform actual download - file_path = self._download_sync(youtube_url, title) - - # Clear current download ID + video_id, title = filename.split('||', 1) + youtube_url = f"https://www.youtube.com/watch?v={video_id}" + logger.info("Starting YouTube download: %s (%s)", title, youtube_url) + + def _impl(download_id, _target_id, display_name): + # The progress hook reads ``current_download_id`` to know + # which download to update. Set it before the call, clear + # after, even on exception. + self.current_download_id = download_id + try: + return self._download_sync(youtube_url, title) + finally: self.current_download_id = None - # Record completion time for rate limiting - self._last_download_time = time.time() - - if file_path: - # Mark as completed/succeeded (match Soulseek state) - with self._download_lock: - if download_id in self.active_downloads: - # IMPORTANT: Keep original filename for context lookup! - # The filename must match what was used to create the context entry - # We store the actual file path separately - self.active_downloads[download_id]['state'] = 'Completed, Succeeded' # Match Soulseek - self.active_downloads[download_id]['progress'] = 100.0 - self.active_downloads[download_id]['file_path'] = file_path - # DO NOT update filename - keep original_filename for context matching - - logger.info(f"YouTube download {download_id} completed: {file_path}") - else: - # Mark as errored - with self._download_lock: - if download_id in self.active_downloads: - self.active_downloads[download_id]['state'] = 'Errored' - - logger.error(f"YouTube download {download_id} failed") - - except Exception as e: - logger.error(f"YouTube download thread failed for {download_id}: {e}") - import traceback - traceback.print_exc() - - # Mark as errored - with self._download_lock: - if download_id in self.active_downloads: - self.active_downloads[download_id]['state'] = 'Errored' - - # Clear current download ID - if self.current_download_id == download_id: - self.current_download_id = None + return self._engine.worker.dispatch( + source_name='youtube', + target_id=video_id, + display_name=title, + original_filename=filename, + impl_callable=_impl, + extra_record_fields={ + 'video_id': video_id, + 'url': youtube_url, + 'title': title, + }, + ) + # Legacy worker stub kept temporarily for legacy comment context — + # see _download_sync below for the actual yt-dlp invocation that + # the engine's BackgroundDownloadWorker now drives. def _download_sync(self, youtube_url: str, title: str) -> Optional[str]: """ Synchronous download method (runs in thread pool executor). @@ -1138,123 +1065,76 @@ class YouTubeClient: traceback.print_exc() return None - async def get_all_downloads(self) -> List[DownloadStatus]: - """ - Get all active downloads (matches Soulseek interface). + def _record_to_status(self, record): + """Translate an engine record dict into the DownloadStatus + dataclass shape consumers expect.""" + return DownloadStatus( + id=record['id'], + filename=record['filename'], + username=record['username'], + state=record['state'], + progress=record['progress'], + size=record.get('size', 0), + transferred=record.get('transferred', 0), + speed=record.get('speed', 0), + time_remaining=record.get('time_remaining'), + file_path=record.get('file_path'), + ) - Returns: - List of DownloadStatus objects for all active downloads - """ - download_statuses = [] - - with self._download_lock: - for _download_id, download_info in self.active_downloads.items(): - status = DownloadStatus( - id=download_info['id'], - filename=download_info['filename'], - username=download_info['username'], - state=download_info['state'], - progress=download_info['progress'], - size=download_info['size'], - transferred=download_info['transferred'], - speed=download_info['speed'], - time_remaining=download_info.get('time_remaining') - ) - download_statuses.append(status) - - return download_statuses + async def get_all_downloads(self) -> List[DownloadStatus]: + """Active downloads owned by the YouTube source — read from + engine state.""" + if self._engine is None: + return [] + return [ + self._record_to_status(record) + for record in self._engine.iter_records_for_source('youtube') + ] async def get_download_status(self, download_id: str) -> Optional[DownloadStatus]: - """ - Get status of a specific download (matches Soulseek interface). - - Args: - download_id: Download ID to query - - Returns: - DownloadStatus object or None if not found - """ - with self._download_lock: - if download_id not in self.active_downloads: - return None - - download_info = self.active_downloads[download_id] - - return DownloadStatus( - id=download_info['id'], - filename=download_info['filename'], - username=download_info['username'], - state=download_info['state'], - progress=download_info['progress'], - size=download_info['size'], - transferred=download_info['transferred'], - speed=download_info['speed'], - time_remaining=download_info.get('time_remaining'), - file_path=download_info.get('file_path') - ) - + """Single download status — read from engine state. Returns + None if this id isn't owned by YouTube (or not found).""" + if self._engine is None: + return None + record = self._engine.get_record('youtube', download_id) + if record is None: + return None + return self._record_to_status(record) async def clear_all_completed_downloads(self) -> bool: - """ - Clear all terminal (completed, cancelled, errored) downloads from the list. - Matches Soulseek interface. - """ + """Clear terminal-state downloads (Completed / Cancelled / + Errored / Aborted) from engine state.""" + if self._engine is None: + return True try: - with self._download_lock: - # Identify IDs to remove - ids_to_remove = [] - for download_id, info in self.active_downloads.items(): - state = info.get('state', '') - # Check for terminal states - # Note: We check exact strings used in _download_thread_worker and cancel_download - if state in ['Completed, Succeeded', 'Cancelled', 'Errored', 'Aborted']: - ids_to_remove.append(download_id) - - # Remove them - for download_id in ids_to_remove: - del self.active_downloads[download_id] - logger.debug(f"Cleared finished download {download_id}") - + terminal_states = {'Completed, Succeeded', 'Cancelled', 'Errored', 'Aborted'} + for record in list(self._engine.iter_records_for_source('youtube')): + if record.get('state') in terminal_states: + self._engine.remove_record('youtube', record['id']) + logger.debug("Cleared finished YouTube download %s", record['id']) return True except Exception as e: logger.error(f"Error clearing downloads: {e}") return False async def cancel_download(self, download_id: str, username: str = None, remove: bool = False) -> bool: - """ - Cancel an active download (matches Soulseek interface). - - NOTE: YouTube downloads cannot be truly cancelled mid-download, - but we mark them as cancelled for UI consistency. - - Args: - download_id: Download ID to cancel - username: Ignored for YouTube (kept for interface compatibility) - remove: If True, remove from active downloads after cancelling - - Returns: - True if cancelled successfully, False otherwise - """ - try: - with self._download_lock: - if download_id not in self.active_downloads: - logger.warning(f"Download {download_id} not found") - return False - - # Update state to cancelled - self.active_downloads[download_id]['state'] = 'Cancelled' - logger.info(f"Marked YouTube download {download_id} as cancelled") - - # Remove from active downloads if requested - if remove: - del self.active_downloads[download_id] - logger.info(f"Removed YouTube download {download_id} from queue") - - return True - - except Exception as e: - logger.error(f"Failed to cancel download {download_id}: {e}") + """Mark a YouTube download as cancelled. yt-dlp downloads + can't be truly interrupted mid-stream — this only flips + the state for UI consistency. ``remove=True`` also drops + the engine record.""" + if self._engine is None: return False + record = self._engine.get_record('youtube', download_id) + if record is None: + logger.warning(f"YouTube download {download_id} not found") + return False + + self._engine.update_record('youtube', download_id, {'state': 'Cancelled'}) + logger.info(f"Marked YouTube download {download_id} as cancelled") + if remove: + self._engine.remove_record('youtube', download_id) + logger.info(f"Removed YouTube download {download_id} from queue") + return True def _enhance_metadata(self, filepath: str, spotify_track: Optional[SpotifyTrack], yt_result: YouTubeSearchResult, track_number: int = 1, disc_number: int = 1, release_year: str = None, artist_genres: list = None): """ diff --git a/tests/downloads/test_youtube_pinning.py b/tests/downloads/test_youtube_pinning.py index 67fc02ed..6c65e534 100644 --- a/tests/downloads/test_youtube_pinning.py +++ b/tests/downloads/test_youtube_pinning.py @@ -1,18 +1,24 @@ -"""Phase A pinning tests for YouTubeClient's download lifecycle. - -YouTube uses a yt-dlp subprocess wrapped in a threading.Thread. The -upcoming engine refactor will lift the thread management + state -tracking + rate-limit semaphore OUT of this client and into the -central engine — leaving YouTubeClient as just `_download_impl` -(the yt-dlp subprocess invocation) + `search_videos` (the search -request) + auth/config. - -These tests pin the OBSERVABLE BEHAVIOR that the engine will -preserve: filename encoding format, download_id shape, state-dict -schema, and the failure modes (invalid filename, etc.). They do -NOT exercise the yt-dlp subprocess itself — that's the -source-specific atomic operation that stays per-client through the -refactor. +"""Phase A pinning tests for YouTubeClient — UPDATED for Phase C2. + +Post-C2 the client no longer owns its own ``active_downloads`` dict +or thread spawn — both moved into the engine's BackgroundDownloadWorker. +These tests still pin the same OBSERVABLE CONTRACT (filename +encoding, UUID download_id, initial-record schema, source-specific +extras like video_id/url/title) but read state from +``engine.get_record(...)`` instead of ``client.active_downloads[...]``. + +What pre-C2 pinning tests caught and what these still catch: +- Filename format: `video_id||title` ✓ +- Invalid filename → None ✓ +- UUID download_id format ✓ +- Per-download record schema (id, filename, username, state, + progress, video_id, url, title, file_path) ✓ +- Source name in record's username slot is `'youtube'` ✓ + +What dropped (covered by other tests): +- Direct thread-spawn assertion (engine.worker has its own tests). +- `_download_thread_worker` target (gone from client; engine owns + it). """ from __future__ import annotations @@ -24,6 +30,7 @@ from unittest.mock import patch import pytest +from core.download_engine import DownloadEngine from core.youtube_client import YouTubeClient @@ -36,20 +43,14 @@ def _run_async(coro): @pytest.fixture -def yt_client(): - """A YouTubeClient with a temp download path. Threading is NOT - patched here — individual tests that don't want the background - thread to actually run patch it themselves so the download_id - can be returned + state-dict pinned without yt-dlp ever firing.""" +def yt_client_with_engine(): + """A bare YouTubeClient wired into a real engine. The engine + callback is invoked manually since we bypass orchestrator init.""" client = YouTubeClient.__new__(YouTubeClient) client.download_path = Path('./test_yt_downloads') client.shutdown_check = None client.matching_engine = None - client.active_downloads = {} - client._download_lock = threading.Lock() - client._download_semaphore = threading.Semaphore(1) client._download_delay = 3 - client._last_download_time = 0 client.current_download_id = None client.current_download_progress = { 'status': 'idle', 'percent': 0.0, 'downloaded_bytes': 0, @@ -57,7 +58,11 @@ def yt_client(): } client.progress_callback = None client.download_opts = {} - return client + client._engine = None + + engine = DownloadEngine() + client.set_engine(engine) + return client, engine # --------------------------------------------------------------------------- @@ -65,92 +70,130 @@ def yt_client(): # --------------------------------------------------------------------------- -def test_download_returns_none_for_invalid_filename_format(yt_client): - """Pinning: YouTube encodes the search result as `video_id||title`. - A filename without `||` is invalid → None (not exception). This is - the soft-fail signal the orchestrator's hybrid fallback relies on.""" - result = _run_async(yt_client.download('youtube', 'no-separator-here', 0)) +def test_download_returns_none_for_invalid_filename_format(yt_client_with_engine): + """Pinning: missing `||` → None (not exception).""" + client, _ = yt_client_with_engine + result = _run_async(client.download('youtube', 'no-separator', 0)) assert result is None -def test_download_returns_uuid_download_id_for_valid_filename(yt_client): - """Pinning: a valid `video_id||title` filename produces a UUID - download_id immediately. The actual download runs in a background - thread; the orchestrator polls via get_download_status.""" - # Patch threading.Thread so the worker never actually runs (no - # yt-dlp invocation, no real network). - with patch('core.youtube_client.threading.Thread') as fake_thread_cls: - fake_thread = fake_thread_cls.return_value - fake_thread.start = lambda: None +def test_download_returns_none_when_engine_not_wired(): + """Defensive: client without engine reference can't dispatch. + In production this never happens (orchestrator wires engine + immediately) but the soft-fail keeps tests + dev paths safe.""" + client = YouTubeClient.__new__(YouTubeClient) + client._engine = None + result = _run_async(client.download('youtube', 'v||t', 0)) + assert result is None + - result = _run_async(yt_client.download('youtube', 'abc123||Some Song', 0)) +def test_download_returns_uuid_download_id_for_valid_filename(yt_client_with_engine): + """Pinning: valid `video_id||title` → UUID download_id.""" + client, engine = yt_client_with_engine + + # Patch _download_sync so the worker thread's impl returns + # without doing real yt-dlp work. + with patch.object(client, '_download_sync', return_value='/tmp/x.mp3'): + result = _run_async(client.download('youtube', 'abc123||Some Song', 0)) assert result is not None - # UUID format — 36 chars with dashes at standard positions. assert len(result) == 36 assert result.count('-') == 4 -def test_download_populates_active_downloads_with_initial_state(yt_client): - """Pinning: after `download()` returns, the engine refactor will - move this dict into central state, but the SHAPE of the - per-download record must stay the same. Frontend, status APIs, - and post-processing all read these keys directly.""" - with patch('core.youtube_client.threading.Thread') as fake_thread_cls: - fake_thread_cls.return_value.start = lambda: None +def test_download_populates_engine_record_with_initial_state(yt_client_with_engine): + """Pinning: per-download record schema. STATE LOCATION CHANGED + in C2 (now in engine), but the SHAPE of the record is the same + — frontend / status APIs / context-key matching depend on these + keys.""" + client, engine = yt_client_with_engine + + # Hold the impl so we can read 'Initializing' / 'InProgress' state + # before the worker completes. + started = threading.Event() + release = threading.Event() + + def slow_impl(*args, **kwargs): + started.set() + release.wait(timeout=1.0) + return '/tmp/done.mp3' + + with patch.object(client, '_download_sync', side_effect=slow_impl): download_id = _run_async( - yt_client.download('youtube', 'video123||My Title', 5000) + client.download('youtube', 'video123||My Title', 5000) ) + started.wait(timeout=1.0) + record = engine.get_record('youtube', download_id) + + assert record is not None + assert record['id'] == download_id + assert record['filename'] == 'video123||My Title' # ORIGINAL form + assert record['username'] == 'youtube' + assert record['state'] in ('Initializing', 'InProgress, Downloading') + assert record['progress'] == 0.0 + assert record['file_path'] is None + # Source-specific extras must merge into the record. + assert record['video_id'] == 'video123' + assert record['url'] == 'https://www.youtube.com/watch?v=video123' + assert record['title'] == 'My Title' + + release.set() + + +def test_set_engine_configures_worker_delay(yt_client_with_engine): + """Pinning: when engine is wired, the YouTube download_delay + config (3s default) propagates to the worker so successive + downloads serialize with the same gap they did pre-C2.""" + client, engine = yt_client_with_engine + # Default delay is 3s. + assert engine.worker._get_delay('youtube') == 3.0 + + +# --------------------------------------------------------------------------- +# Query / cancel — engine-backed reads +# --------------------------------------------------------------------------- + + +def test_get_all_downloads_reads_engine_records(yt_client_with_engine): + client, engine = yt_client_with_engine + + # Seed engine with a fake record to mirror what dispatch would do. + engine.add_record('youtube', 'dl-1', { + 'id': 'dl-1', 'filename': 'v||t', 'username': 'youtube', + 'state': 'InProgress, Downloading', 'progress': 50.0, + 'size': 1000, 'transferred': 500, 'speed': 100, + }) + result = _run_async(client.get_all_downloads()) + assert len(result) == 1 + assert result[0].id == 'dl-1' + assert result[0].state == 'InProgress, Downloading' + + +def test_cancel_download_marks_cancelled_and_optionally_removes(yt_client_with_engine): + client, engine = yt_client_with_engine + + engine.add_record('youtube', 'dl-1', { + 'id': 'dl-1', 'filename': 'v||t', 'username': 'youtube', + 'state': 'InProgress, Downloading', 'progress': 50.0, + }) + + ok = _run_async(client.cancel_download('dl-1', None, remove=False)) + assert ok is True + assert engine.get_record('youtube', 'dl-1')['state'] == 'Cancelled' + + ok = _run_async(client.cancel_download('dl-1', None, remove=True)) + assert ok is True + assert engine.get_record('youtube', 'dl-1') is None + + +def test_clear_all_completed_drops_only_terminal_records(yt_client_with_engine): + client, engine = yt_client_with_engine + engine.add_record('youtube', 'done', {'id': 'done', 'state': 'Completed, Succeeded'}) + engine.add_record('youtube', 'erred', {'id': 'erred', 'state': 'Errored'}) + engine.add_record('youtube', 'live', {'id': 'live', 'state': 'InProgress, Downloading'}) + + _run_async(client.clear_all_completed_downloads()) - record = yt_client.active_downloads[download_id] - # Pin the state-dict schema. These keys are consumed by the - # status API + frontend + matched_downloads_context lookups. - assert record['id'] == download_id - assert record['filename'] == 'video123||My Title' # ORIGINAL encoded form, not parsed - assert record['username'] == 'youtube' - assert record['state'] == 'Initializing' # Soulseek-style state name - assert record['progress'] == 0.0 - assert record['size'] == 5000 - assert record['transferred'] == 0 - assert record['video_id'] == 'video123' - assert record['url'] == 'https://www.youtube.com/watch?v=video123' - assert record['title'] == 'My Title' - assert record['file_path'] is None # set by worker on completion - - -def test_download_spawns_daemon_thread_for_background_work(yt_client): - """Pinning: the worker MUST be a daemon thread so it doesn't - block process shutdown. Engine refactor's BackgroundDownloadWorker - must preserve this.""" - captured_kwargs = {} - - def capture_thread(*args, **kwargs): - captured_kwargs.update(kwargs) - result = type('FakeThread', (), {'start': lambda self: None})() - return result - - with patch('core.youtube_client.threading.Thread', side_effect=capture_thread): - _run_async(yt_client.download('youtube', 'v||t', 0)) - - assert captured_kwargs.get('daemon') is True - - -def test_download_thread_target_is_download_thread_worker(yt_client): - """Pinning: the spawned thread runs `_download_thread_worker`. - Phase C will replace this with `engine.dispatch_download(...)` - that calls `plugin._download_impl(...)`. The contract that's - preserved: a single thread per download, semaphore-serialized, - state updates flow through `active_downloads`.""" - captured_kwargs = {} - - def capture_thread(*args, **kwargs): - captured_kwargs.update(kwargs) - return type('FakeThread', (), {'start': lambda self: None})() - - with patch('core.youtube_client.threading.Thread', side_effect=capture_thread): - _run_async(yt_client.download('youtube', 'v||t', 0)) - - assert captured_kwargs.get('target') == yt_client._download_thread_worker - # First positional arg of the target is the download_id, then url, title, original_filename. - target_args = captured_kwargs.get('args', ()) - assert len(target_args) == 4 + assert engine.get_record('youtube', 'done') is None + assert engine.get_record('youtube', 'erred') is None + assert engine.get_record('youtube', 'live') is not None