C2: Migrate YouTube to engine.worker

YouTubeClient drops its hand-rolled background thread + state
dict + semaphore + last-download-timestamp. download() now
delegates to engine.worker.dispatch with _download_sync as the
impl callable; YouTube-specific record fields (video_id, url,
title) merge into the engine record via extra_record_fields.

Engine wires itself in via plugin.set_engine(engine) callback
on register_plugin. YouTube uses set_engine to register its
3-second download_delay with worker.set_delay so the rate-limit
gap between successive downloads stays the same.

Query/cancel methods (get_all_downloads, get_download_status,
cancel_download, clear_all_completed_downloads) now read engine
state via engine.iter_records_for_source / get_record /
update_record / remove_record. Net: ~120 LOC of thread+state
boilerplate removed from youtube_client.py.

Phase A pinning tests updated to assert engine state instead of
client.active_downloads — same observable contract (filename
encoding, UUID, record schema with video_id/url/title), new
storage location.

Suite still green (2025 passed). Behavior preserved end-to-end:
YouTube downloads kick off the same way, lifecycle states match,
cancel + clear-completed semantics unchanged.
pull/495/head
Broque Thomas 3 weeks ago
parent 78724861f9
commit 4ddfb01a0a

@ -85,14 +85,24 @@ class DownloadEngine:
once per source by the orchestrator after the registry's
``initialize`` builds the client instances.
Phase B is purely informational the engine doesn't yet
dispatch through plugins. Subsequent phases use these
references to call ``plugin._download_impl`` /
``plugin._search_raw`` etc.
If the plugin exposes ``set_engine(engine)``, the engine
passes a self-reference so the plugin can dispatch into
``engine.worker`` / read state / etc. Plugins that haven't
been migrated to the engine yet simply don't define
``set_engine`` they keep their pre-engine behavior
unchanged.
"""
if source_name in self._plugins:
logger.warning("Plugin %s already registered with engine — overwriting", source_name)
self._plugins[source_name] = plugin
set_engine = getattr(plugin, 'set_engine', None)
if callable(set_engine):
try:
set_engine(self)
except Exception as exc:
logger.warning(
"Plugin %s set_engine callback failed: %s", source_name, exc,
)
def get_plugin(self, source_name: str) -> Optional[Any]:
return self._plugins.get(source_name)

@ -112,10 +112,25 @@ class YouTubeClient:
# Callback for shutdown check (avoids circular imports)
self.shutdown_check = None
# Rate limiting — serialize YouTube downloads with delay
self._download_semaphore = threading.Semaphore(1)
# Rate-limit policy — applied to engine.worker once the engine
# is wired in via set_engine(). Kept as an attribute for
# backward-compat external readers + so settings reload can
# update it without touching the engine.
self._download_delay = config_manager.get('youtube.download_delay', 3)
self._last_download_time = 0
# Engine reference is populated by set_engine() at registration
# time. Until then the client can't dispatch downloads — but
# in production the orchestrator wires the engine immediately
# after constructing the registry, so this is only None in
# tests that bypass the orchestrator.
self._engine = None
def set_engine(self, engine):
"""Engine callback — gives the client access to the central
thread worker + state store. Engine calls this during
``register_plugin`` if the plugin defines it."""
self._engine = engine
engine.worker.set_delay('youtube', float(self._download_delay))
def set_shutdown_check(self, check_callable):
"""Set a callback function to check for system shutdown"""
@ -130,11 +145,6 @@ class YouTubeClient:
logger.error("ffmpeg is required but not found")
logger.error("The client will attempt to auto-download ffmpeg on first use")
# Download queue management (mirrors Soulseek's download tracking)
# Maps download_id -> download_info dict
self.active_downloads: Dict[str, Dict[str, Any]] = {}
self._download_lock = threading.Lock() # Use threading.Lock for thread safety
# Configure yt-dlp options with bot detection bypass
self.download_opts = {
'format': 'bestaudio/best',
@ -859,137 +869,54 @@ class YouTubeClient:
return matches
async def download(self, username: str, filename: str, file_size: int = 0) -> Optional[str]:
"""
Download YouTube video as audio (async, Soulseek-compatible interface).
"""Download YouTube video as audio.
Returns download_id immediately and runs download in background thread.
Monitor via get_download_status() or get_all_downloads().
Returns download_id immediately; the actual download runs in
a background thread spawned by ``engine.worker``. Monitor
via ``orchestrator.get_download_status(download_id)``.
Args:
username: Ignored for YouTube (always "youtube")
filename: Encoded as "video_id||title" from search results
file_size: Ignored for YouTube (kept for interface compatibility)
Returns:
download_id: Unique ID for tracking this download
"""
try:
# Parse filename to extract video_id
if '||' not in filename:
logger.error(f"Invalid filename format: {filename}")
return None
video_id, title = filename.split('||', 1)
youtube_url = f"https://www.youtube.com/watch?v={video_id}"
logger.info(f"Starting YouTube download: {title}")
logger.info(f" URL: {youtube_url}")
# Create unique download ID
download_id = str(uuid.uuid4())
# Initialize download info in active downloads
with self._download_lock:
self.active_downloads[download_id] = {
'id': download_id,
'filename': filename, # Keep original encoded format for context matching!
'username': 'youtube',
'state': 'Initializing', # Soulseek-style states
'progress': 0.0,
'size': file_size or 0,
'transferred': 0,
'speed': 0,
'time_remaining': None,
'video_id': video_id,
'url': youtube_url,
'title': title,
'file_path': None, # Will be set when download completes
}
# Start download in background thread (returns immediately)
download_thread = threading.Thread(
target=self._download_thread_worker,
args=(download_id, youtube_url, title, filename),
daemon=True
)
download_thread.start()
logger.info(f"YouTube download {download_id} started in background")
return download_id
except Exception as e:
logger.error(f"Failed to start YouTube download: {e}")
import traceback
traceback.print_exc()
if '||' not in filename:
logger.error(f"Invalid filename format: {filename}")
return None
if self._engine is None:
logger.error("YouTube client has no engine reference — cannot dispatch download")
return None
def _download_thread_worker(self, download_id: str, youtube_url: str, title: str, original_filename: str):
"""
Background thread worker for downloading YouTube videos.
Updates active_downloads dict with progress.
Serialized via semaphore with configurable delay between downloads.
"""
try:
with self._download_semaphore:
# Enforce delay since last download completed
elapsed = time.time() - self._last_download_time
if self._last_download_time > 0 and elapsed < self._download_delay:
wait_time = self._download_delay - elapsed
logger.info(f"Rate limiting: waiting {wait_time:.1f}s before next YouTube download")
time.sleep(wait_time)
# Update state to downloading
with self._download_lock:
if download_id in self.active_downloads:
self.active_downloads[download_id]['state'] = 'InProgress, Downloading' # Match Soulseek state
# Set current download ID for progress hook
self.current_download_id = download_id
# Perform actual download
file_path = self._download_sync(youtube_url, title)
# Clear current download ID
video_id, title = filename.split('||', 1)
youtube_url = f"https://www.youtube.com/watch?v={video_id}"
logger.info("Starting YouTube download: %s (%s)", title, youtube_url)
def _impl(download_id, _target_id, display_name):
# The progress hook reads ``current_download_id`` to know
# which download to update. Set it before the call, clear
# after, even on exception.
self.current_download_id = download_id
try:
return self._download_sync(youtube_url, title)
finally:
self.current_download_id = None
# Record completion time for rate limiting
self._last_download_time = time.time()
if file_path:
# Mark as completed/succeeded (match Soulseek state)
with self._download_lock:
if download_id in self.active_downloads:
# IMPORTANT: Keep original filename for context lookup!
# The filename must match what was used to create the context entry
# We store the actual file path separately
self.active_downloads[download_id]['state'] = 'Completed, Succeeded' # Match Soulseek
self.active_downloads[download_id]['progress'] = 100.0
self.active_downloads[download_id]['file_path'] = file_path
# DO NOT update filename - keep original_filename for context matching
logger.info(f"YouTube download {download_id} completed: {file_path}")
else:
# Mark as errored
with self._download_lock:
if download_id in self.active_downloads:
self.active_downloads[download_id]['state'] = 'Errored'
logger.error(f"YouTube download {download_id} failed")
except Exception as e:
logger.error(f"YouTube download thread failed for {download_id}: {e}")
import traceback
traceback.print_exc()
# Mark as errored
with self._download_lock:
if download_id in self.active_downloads:
self.active_downloads[download_id]['state'] = 'Errored'
# Clear current download ID
if self.current_download_id == download_id:
self.current_download_id = None
return self._engine.worker.dispatch(
source_name='youtube',
target_id=video_id,
display_name=title,
original_filename=filename,
impl_callable=_impl,
extra_record_fields={
'video_id': video_id,
'url': youtube_url,
'title': title,
},
)
# Legacy worker stub kept temporarily for legacy comment context —
# see _download_sync below for the actual yt-dlp invocation that
# the engine's BackgroundDownloadWorker now drives.
def _download_sync(self, youtube_url: str, title: str) -> Optional[str]:
"""
Synchronous download method (runs in thread pool executor).
@ -1138,123 +1065,76 @@ class YouTubeClient:
traceback.print_exc()
return None
async def get_all_downloads(self) -> List[DownloadStatus]:
"""
Get all active downloads (matches Soulseek interface).
def _record_to_status(self, record):
"""Translate an engine record dict into the DownloadStatus
dataclass shape consumers expect."""
return DownloadStatus(
id=record['id'],
filename=record['filename'],
username=record['username'],
state=record['state'],
progress=record['progress'],
size=record.get('size', 0),
transferred=record.get('transferred', 0),
speed=record.get('speed', 0),
time_remaining=record.get('time_remaining'),
file_path=record.get('file_path'),
)
Returns:
List of DownloadStatus objects for all active downloads
"""
download_statuses = []
with self._download_lock:
for _download_id, download_info in self.active_downloads.items():
status = DownloadStatus(
id=download_info['id'],
filename=download_info['filename'],
username=download_info['username'],
state=download_info['state'],
progress=download_info['progress'],
size=download_info['size'],
transferred=download_info['transferred'],
speed=download_info['speed'],
time_remaining=download_info.get('time_remaining')
)
download_statuses.append(status)
return download_statuses
async def get_all_downloads(self) -> List[DownloadStatus]:
"""Active downloads owned by the YouTube source — read from
engine state."""
if self._engine is None:
return []
return [
self._record_to_status(record)
for record in self._engine.iter_records_for_source('youtube')
]
async def get_download_status(self, download_id: str) -> Optional[DownloadStatus]:
"""
Get status of a specific download (matches Soulseek interface).
Args:
download_id: Download ID to query
Returns:
DownloadStatus object or None if not found
"""
with self._download_lock:
if download_id not in self.active_downloads:
return None
download_info = self.active_downloads[download_id]
return DownloadStatus(
id=download_info['id'],
filename=download_info['filename'],
username=download_info['username'],
state=download_info['state'],
progress=download_info['progress'],
size=download_info['size'],
transferred=download_info['transferred'],
speed=download_info['speed'],
time_remaining=download_info.get('time_remaining'),
file_path=download_info.get('file_path')
)
"""Single download status — read from engine state. Returns
None if this id isn't owned by YouTube (or not found)."""
if self._engine is None:
return None
record = self._engine.get_record('youtube', download_id)
if record is None:
return None
return self._record_to_status(record)
async def clear_all_completed_downloads(self) -> bool:
"""
Clear all terminal (completed, cancelled, errored) downloads from the list.
Matches Soulseek interface.
"""
"""Clear terminal-state downloads (Completed / Cancelled /
Errored / Aborted) from engine state."""
if self._engine is None:
return True
try:
with self._download_lock:
# Identify IDs to remove
ids_to_remove = []
for download_id, info in self.active_downloads.items():
state = info.get('state', '')
# Check for terminal states
# Note: We check exact strings used in _download_thread_worker and cancel_download
if state in ['Completed, Succeeded', 'Cancelled', 'Errored', 'Aborted']:
ids_to_remove.append(download_id)
# Remove them
for download_id in ids_to_remove:
del self.active_downloads[download_id]
logger.debug(f"Cleared finished download {download_id}")
terminal_states = {'Completed, Succeeded', 'Cancelled', 'Errored', 'Aborted'}
for record in list(self._engine.iter_records_for_source('youtube')):
if record.get('state') in terminal_states:
self._engine.remove_record('youtube', record['id'])
logger.debug("Cleared finished YouTube download %s", record['id'])
return True
except Exception as e:
logger.error(f"Error clearing downloads: {e}")
return False
async def cancel_download(self, download_id: str, username: str = None, remove: bool = False) -> bool:
"""
Cancel an active download (matches Soulseek interface).
NOTE: YouTube downloads cannot be truly cancelled mid-download,
but we mark them as cancelled for UI consistency.
Args:
download_id: Download ID to cancel
username: Ignored for YouTube (kept for interface compatibility)
remove: If True, remove from active downloads after cancelling
Returns:
True if cancelled successfully, False otherwise
"""
try:
with self._download_lock:
if download_id not in self.active_downloads:
logger.warning(f"Download {download_id} not found")
return False
# Update state to cancelled
self.active_downloads[download_id]['state'] = 'Cancelled'
logger.info(f"Marked YouTube download {download_id} as cancelled")
# Remove from active downloads if requested
if remove:
del self.active_downloads[download_id]
logger.info(f"Removed YouTube download {download_id} from queue")
return True
except Exception as e:
logger.error(f"Failed to cancel download {download_id}: {e}")
"""Mark a YouTube download as cancelled. yt-dlp downloads
can't be truly interrupted mid-stream — this only flips
the state for UI consistency. ``remove=True`` also drops
the engine record."""
if self._engine is None:
return False
record = self._engine.get_record('youtube', download_id)
if record is None:
logger.warning(f"YouTube download {download_id} not found")
return False
self._engine.update_record('youtube', download_id, {'state': 'Cancelled'})
logger.info(f"Marked YouTube download {download_id} as cancelled")
if remove:
self._engine.remove_record('youtube', download_id)
logger.info(f"Removed YouTube download {download_id} from queue")
return True
def _enhance_metadata(self, filepath: str, spotify_track: Optional[SpotifyTrack], yt_result: YouTubeSearchResult, track_number: int = 1, disc_number: int = 1, release_year: str = None, artist_genres: list = None):
"""

@ -1,18 +1,24 @@
"""Phase A pinning tests for YouTubeClient's download lifecycle.
YouTube uses a yt-dlp subprocess wrapped in a threading.Thread. The
upcoming engine refactor will lift the thread management + state
tracking + rate-limit semaphore OUT of this client and into the
central engine leaving YouTubeClient as just `_download_impl`
(the yt-dlp subprocess invocation) + `search_videos` (the search
request) + auth/config.
These tests pin the OBSERVABLE BEHAVIOR that the engine will
preserve: filename encoding format, download_id shape, state-dict
schema, and the failure modes (invalid filename, etc.). They do
NOT exercise the yt-dlp subprocess itself that's the
source-specific atomic operation that stays per-client through the
refactor.
"""Phase A pinning tests for YouTubeClient — UPDATED for Phase C2.
Post-C2 the client no longer owns its own ``active_downloads`` dict
or thread spawn both moved into the engine's BackgroundDownloadWorker.
These tests still pin the same OBSERVABLE CONTRACT (filename
encoding, UUID download_id, initial-record schema, source-specific
extras like video_id/url/title) but read state from
``engine.get_record(...)`` instead of ``client.active_downloads[...]``.
What pre-C2 pinning tests caught and what these still catch:
- Filename format: `video_id||title`
- Invalid filename None
- UUID download_id format
- Per-download record schema (id, filename, username, state,
progress, video_id, url, title, file_path)
- Source name in record's username slot is `'youtube'` ✓
What dropped (covered by other tests):
- Direct thread-spawn assertion (engine.worker has its own tests).
- `_download_thread_worker` target (gone from client; engine owns
it).
"""
from __future__ import annotations
@ -24,6 +30,7 @@ from unittest.mock import patch
import pytest
from core.download_engine import DownloadEngine
from core.youtube_client import YouTubeClient
@ -36,20 +43,14 @@ def _run_async(coro):
@pytest.fixture
def yt_client():
"""A YouTubeClient with a temp download path. Threading is NOT
patched here individual tests that don't want the background
thread to actually run patch it themselves so the download_id
can be returned + state-dict pinned without yt-dlp ever firing."""
def yt_client_with_engine():
"""A bare YouTubeClient wired into a real engine. The engine
callback is invoked manually since we bypass orchestrator init."""
client = YouTubeClient.__new__(YouTubeClient)
client.download_path = Path('./test_yt_downloads')
client.shutdown_check = None
client.matching_engine = None
client.active_downloads = {}
client._download_lock = threading.Lock()
client._download_semaphore = threading.Semaphore(1)
client._download_delay = 3
client._last_download_time = 0
client.current_download_id = None
client.current_download_progress = {
'status': 'idle', 'percent': 0.0, 'downloaded_bytes': 0,
@ -57,7 +58,11 @@ def yt_client():
}
client.progress_callback = None
client.download_opts = {}
return client
client._engine = None
engine = DownloadEngine()
client.set_engine(engine)
return client, engine
# ---------------------------------------------------------------------------
@ -65,92 +70,130 @@ def yt_client():
# ---------------------------------------------------------------------------
def test_download_returns_none_for_invalid_filename_format(yt_client):
"""Pinning: YouTube encodes the search result as `video_id||title`.
A filename without `||` is invalid None (not exception). This is
the soft-fail signal the orchestrator's hybrid fallback relies on."""
result = _run_async(yt_client.download('youtube', 'no-separator-here', 0))
def test_download_returns_none_for_invalid_filename_format(yt_client_with_engine):
"""Pinning: missing `||` → None (not exception)."""
client, _ = yt_client_with_engine
result = _run_async(client.download('youtube', 'no-separator', 0))
assert result is None
def test_download_returns_uuid_download_id_for_valid_filename(yt_client):
"""Pinning: a valid `video_id||title` filename produces a UUID
download_id immediately. The actual download runs in a background
thread; the orchestrator polls via get_download_status."""
# Patch threading.Thread so the worker never actually runs (no
# yt-dlp invocation, no real network).
with patch('core.youtube_client.threading.Thread') as fake_thread_cls:
fake_thread = fake_thread_cls.return_value
fake_thread.start = lambda: None
def test_download_returns_none_when_engine_not_wired():
"""Defensive: client without engine reference can't dispatch.
In production this never happens (orchestrator wires engine
immediately) but the soft-fail keeps tests + dev paths safe."""
client = YouTubeClient.__new__(YouTubeClient)
client._engine = None
result = _run_async(client.download('youtube', 'v||t', 0))
assert result is None
result = _run_async(yt_client.download('youtube', 'abc123||Some Song', 0))
def test_download_returns_uuid_download_id_for_valid_filename(yt_client_with_engine):
"""Pinning: valid `video_id||title` → UUID download_id."""
client, engine = yt_client_with_engine
# Patch _download_sync so the worker thread's impl returns
# without doing real yt-dlp work.
with patch.object(client, '_download_sync', return_value='/tmp/x.mp3'):
result = _run_async(client.download('youtube', 'abc123||Some Song', 0))
assert result is not None
# UUID format — 36 chars with dashes at standard positions.
assert len(result) == 36
assert result.count('-') == 4
def test_download_populates_active_downloads_with_initial_state(yt_client):
"""Pinning: after `download()` returns, the engine refactor will
move this dict into central state, but the SHAPE of the
per-download record must stay the same. Frontend, status APIs,
and post-processing all read these keys directly."""
with patch('core.youtube_client.threading.Thread') as fake_thread_cls:
fake_thread_cls.return_value.start = lambda: None
def test_download_populates_engine_record_with_initial_state(yt_client_with_engine):
"""Pinning: per-download record schema. STATE LOCATION CHANGED
in C2 (now in engine), but the SHAPE of the record is the same
frontend / status APIs / context-key matching depend on these
keys."""
client, engine = yt_client_with_engine
# Hold the impl so we can read 'Initializing' / 'InProgress' state
# before the worker completes.
started = threading.Event()
release = threading.Event()
def slow_impl(*args, **kwargs):
started.set()
release.wait(timeout=1.0)
return '/tmp/done.mp3'
with patch.object(client, '_download_sync', side_effect=slow_impl):
download_id = _run_async(
yt_client.download('youtube', 'video123||My Title', 5000)
client.download('youtube', 'video123||My Title', 5000)
)
started.wait(timeout=1.0)
record = engine.get_record('youtube', download_id)
assert record is not None
assert record['id'] == download_id
assert record['filename'] == 'video123||My Title' # ORIGINAL form
assert record['username'] == 'youtube'
assert record['state'] in ('Initializing', 'InProgress, Downloading')
assert record['progress'] == 0.0
assert record['file_path'] is None
# Source-specific extras must merge into the record.
assert record['video_id'] == 'video123'
assert record['url'] == 'https://www.youtube.com/watch?v=video123'
assert record['title'] == 'My Title'
release.set()
def test_set_engine_configures_worker_delay(yt_client_with_engine):
"""Pinning: when engine is wired, the YouTube download_delay
config (3s default) propagates to the worker so successive
downloads serialize with the same gap they did pre-C2."""
client, engine = yt_client_with_engine
# Default delay is 3s.
assert engine.worker._get_delay('youtube') == 3.0
# ---------------------------------------------------------------------------
# Query / cancel — engine-backed reads
# ---------------------------------------------------------------------------
def test_get_all_downloads_reads_engine_records(yt_client_with_engine):
client, engine = yt_client_with_engine
# Seed engine with a fake record to mirror what dispatch would do.
engine.add_record('youtube', 'dl-1', {
'id': 'dl-1', 'filename': 'v||t', 'username': 'youtube',
'state': 'InProgress, Downloading', 'progress': 50.0,
'size': 1000, 'transferred': 500, 'speed': 100,
})
result = _run_async(client.get_all_downloads())
assert len(result) == 1
assert result[0].id == 'dl-1'
assert result[0].state == 'InProgress, Downloading'
def test_cancel_download_marks_cancelled_and_optionally_removes(yt_client_with_engine):
client, engine = yt_client_with_engine
engine.add_record('youtube', 'dl-1', {
'id': 'dl-1', 'filename': 'v||t', 'username': 'youtube',
'state': 'InProgress, Downloading', 'progress': 50.0,
})
ok = _run_async(client.cancel_download('dl-1', None, remove=False))
assert ok is True
assert engine.get_record('youtube', 'dl-1')['state'] == 'Cancelled'
ok = _run_async(client.cancel_download('dl-1', None, remove=True))
assert ok is True
assert engine.get_record('youtube', 'dl-1') is None
def test_clear_all_completed_drops_only_terminal_records(yt_client_with_engine):
client, engine = yt_client_with_engine
engine.add_record('youtube', 'done', {'id': 'done', 'state': 'Completed, Succeeded'})
engine.add_record('youtube', 'erred', {'id': 'erred', 'state': 'Errored'})
engine.add_record('youtube', 'live', {'id': 'live', 'state': 'InProgress, Downloading'})
_run_async(client.clear_all_completed_downloads())
record = yt_client.active_downloads[download_id]
# Pin the state-dict schema. These keys are consumed by the
# status API + frontend + matched_downloads_context lookups.
assert record['id'] == download_id
assert record['filename'] == 'video123||My Title' # ORIGINAL encoded form, not parsed
assert record['username'] == 'youtube'
assert record['state'] == 'Initializing' # Soulseek-style state name
assert record['progress'] == 0.0
assert record['size'] == 5000
assert record['transferred'] == 0
assert record['video_id'] == 'video123'
assert record['url'] == 'https://www.youtube.com/watch?v=video123'
assert record['title'] == 'My Title'
assert record['file_path'] is None # set by worker on completion
def test_download_spawns_daemon_thread_for_background_work(yt_client):
"""Pinning: the worker MUST be a daemon thread so it doesn't
block process shutdown. Engine refactor's BackgroundDownloadWorker
must preserve this."""
captured_kwargs = {}
def capture_thread(*args, **kwargs):
captured_kwargs.update(kwargs)
result = type('FakeThread', (), {'start': lambda self: None})()
return result
with patch('core.youtube_client.threading.Thread', side_effect=capture_thread):
_run_async(yt_client.download('youtube', 'v||t', 0))
assert captured_kwargs.get('daemon') is True
def test_download_thread_target_is_download_thread_worker(yt_client):
"""Pinning: the spawned thread runs `_download_thread_worker`.
Phase C will replace this with `engine.dispatch_download(...)`
that calls `plugin._download_impl(...)`. The contract that's
preserved: a single thread per download, semaphore-serialized,
state updates flow through `active_downloads`."""
captured_kwargs = {}
def capture_thread(*args, **kwargs):
captured_kwargs.update(kwargs)
return type('FakeThread', (), {'start': lambda self: None})()
with patch('core.youtube_client.threading.Thread', side_effect=capture_thread):
_run_async(yt_client.download('youtube', 'v||t', 0))
assert captured_kwargs.get('target') == yt_client._download_thread_worker
# First positional arg of the target is the download_id, then url, title, original_filename.
target_args = captured_kwargs.get('args', ())
assert len(target_args) == 4
assert engine.get_record('youtube', 'done') is None
assert engine.get_record('youtube', 'erred') is None
assert engine.get_record('youtube', 'live') is not None

Loading…
Cancel
Save