From 73fb60a68a6d526bb385d4849f7c96fed3b89999 Mon Sep 17 00:00:00 2001 From: Broque Thomas <26755000+Nezreka@users.noreply.github.com> Date: Mon, 4 May 2026 13:49:36 -0700 Subject: [PATCH] C3: Migrate Tidal to engine.worker MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Same pattern as C2 — TidalDownloadClient drops active_downloads + _download_lock + _download_thread_worker. download() delegates to engine.worker.dispatch with _download_sync as the impl. Source-specific extras (track_id, display_name) merge into the engine record. The HLS-segment progress callback (_update_download_progress) now writes to engine state via engine.update_record instead of mutating the per-client dict in-place. Query/cancel methods (get_all_downloads, get_download_status, cancel_download, clear_all_completed_downloads) now read engine state via the same accessors as the YouTube migration. Pinning tests updated to assert engine state. Suite still green (313 download tests). Behavior preserved end-to-end. --- core/tidal_download_client.py | 265 ++++++++++---------------- tests/downloads/test_tidal_pinning.py | 210 ++++++++++---------- 2 files changed, 205 insertions(+), 270 deletions(-) diff --git a/core/tidal_download_client.py b/core/tidal_download_client.py index f6db0632..3a5649ed 100644 --- a/core/tidal_download_client.py +++ b/core/tidal_download_client.py @@ -116,12 +116,21 @@ class TidalDownloadClient: self.session: Optional['tidalapi.Session'] = None self._init_session() - self.active_downloads: Dict[str, Dict[str, Any]] = {} - self._download_lock = threading.Lock() - self._device_auth_future = None self._device_auth_link = None + # Engine reference is populated by set_engine() at registration + # time. Until then dispatch returns None — orchestrator wires + # this immediately so the only None case is tests that bypass + # the orchestrator. + self._engine = None + + def set_engine(self, engine): + """Engine callback — gives the client access to the central + thread worker + state store. Engine calls this during + ``register_plugin`` if the plugin defines it.""" + self._engine = engine + def set_shutdown_check(self, check_callable): self.shutdown_check = check_callable @@ -604,85 +613,33 @@ class TidalDownloadClient: ) from exc async def download(self, username: str, filename: str, file_size: int = 0) -> Optional[str]: - try: - if '||' not in filename: - logger.error(f"Invalid filename format: {filename}") - return None - - track_id_str, display_name = filename.split('||', 1) - try: - track_id = int(track_id_str) - except ValueError: - logger.error(f"Invalid Tidal track ID: {track_id_str}") - return None - - logger.info(f"Starting Tidal download: {display_name}") - - download_id = str(uuid.uuid4()) - - with self._download_lock: - self.active_downloads[download_id] = { - 'id': download_id, - 'filename': filename, - 'username': 'tidal', - 'state': 'Initializing', - 'progress': 0.0, - 'size': 0, - 'transferred': 0, - 'speed': 0, - 'time_remaining': None, - 'track_id': track_id, - 'display_name': display_name, - 'file_path': None, - } - - download_thread = threading.Thread( - target=self._download_thread_worker, - args=(download_id, track_id, display_name, filename), - daemon=True, - ) - download_thread.start() - - logger.info(f"Tidal download {download_id} started in background") - return download_id - - except Exception as e: - logger.error(f"Failed to start Tidal download: {e}") - import traceback - traceback.print_exc() + if '||' not in filename: + logger.error(f"Invalid filename format: {filename}") + return None + if self._engine is None: + logger.error("Tidal client has no engine reference — cannot dispatch download") return None - def _download_thread_worker(self, download_id: str, track_id: int, display_name: str, original_filename: str): + track_id_str, display_name = filename.split('||', 1) try: - with self._download_lock: - if download_id in self.active_downloads: - self.active_downloads[download_id]['state'] = 'InProgress, Downloading' - - file_path = self._download_sync(download_id, track_id, display_name) - - if file_path: - with self._download_lock: - if download_id in self.active_downloads: - self.active_downloads[download_id]['state'] = 'Completed, Succeeded' - self.active_downloads[download_id]['progress'] = 100.0 - self.active_downloads[download_id]['file_path'] = file_path - - logger.info(f"Tidal download {download_id} completed: {file_path}") - else: - with self._download_lock: - if download_id in self.active_downloads: - self.active_downloads[download_id]['state'] = 'Errored' - - logger.error(f"Tidal download {download_id} failed") - - except Exception as e: - logger.error(f"Tidal download thread failed for {download_id}: {e}") - import traceback - traceback.print_exc() + track_id = int(track_id_str) + except ValueError: + logger.error(f"Invalid Tidal track ID: {track_id_str}") + return None - with self._download_lock: - if download_id in self.active_downloads: - self.active_downloads[download_id]['state'] = 'Errored' + logger.info(f"Starting Tidal download: {display_name}") + + return self._engine.worker.dispatch( + source_name='tidal', + target_id=track_id, + display_name=display_name, + original_filename=filename, + impl_callable=self._download_sync, + extra_record_fields={ + 'track_id': track_id, + 'display_name': display_name, + }, + ) def _download_sync(self, download_id: str, track_id: int, display_name: str) -> Optional[str]: if not self.session or not self.session.check_login(): @@ -727,9 +684,8 @@ class TidalDownloadClient: speed_start = time.time() segments_completed = 0 - with self._download_lock: - if download_id in self.active_downloads: - self.active_downloads[download_id]['size'] = 0 + if self._engine is not None: + self._engine.update_record('tidal', download_id, {'size': 0}) with intermediate_path.open('wb') as output_file: if init_uri: @@ -828,97 +784,82 @@ class TidalDownloadClient: def _update_download_progress(self, download_id: str, downloaded: int, segments_completed: int, total_segments: int, speed_start: float): - with self._download_lock: - if download_id not in self.active_downloads: - return - info = self.active_downloads[download_id] - info['transferred'] = downloaded - - now = time.time() - elapsed_total = now - speed_start - speed = int(downloaded / elapsed_total) if elapsed_total > 0 else 0 - info['speed'] = speed - - if total_segments > 0: - progress = (segments_completed / total_segments) * 100 - info['progress'] = round(min(progress, 99.9), 1) - - time_remaining = None - if speed > 0: - remaining_bytes = downloaded * (total_segments / max(segments_completed, 1)) - downloaded - if remaining_bytes > 0: - time_remaining = int(remaining_bytes / speed) - info['time_remaining'] = time_remaining + if self._engine is None: + return + record = self._engine.get_record('tidal', download_id) + if record is None: + return - async def get_all_downloads(self) -> List[DownloadStatus]: - download_statuses = [] - - with self._download_lock: - for _download_id, info in self.active_downloads.items(): - status = DownloadStatus( - id=info['id'], - filename=info['filename'], - username=info['username'], - state=info['state'], - progress=info['progress'], - size=info['size'], - transferred=info['transferred'], - speed=info['speed'], - time_remaining=info.get('time_remaining'), - file_path=info.get('file_path'), - ) - download_statuses.append(status) + now = time.time() + elapsed_total = now - speed_start + speed = int(downloaded / elapsed_total) if elapsed_total > 0 else 0 + + progress = record.get('progress', 0.0) + if total_segments > 0: + progress = round(min((segments_completed / total_segments) * 100, 99.9), 1) + + time_remaining = None + if speed > 0: + remaining_bytes = downloaded * (total_segments / max(segments_completed, 1)) - downloaded + if remaining_bytes > 0: + time_remaining = int(remaining_bytes / speed) + + self._engine.update_record('tidal', download_id, { + 'transferred': downloaded, + 'speed': speed, + 'progress': progress, + 'time_remaining': time_remaining, + }) - return download_statuses + def _record_to_status(self, record): + return DownloadStatus( + id=record['id'], + filename=record['filename'], + username=record['username'], + state=record['state'], + progress=record['progress'], + size=record.get('size', 0), + transferred=record.get('transferred', 0), + speed=record.get('speed', 0), + time_remaining=record.get('time_remaining'), + file_path=record.get('file_path'), + ) - async def get_download_status(self, download_id: str) -> Optional[DownloadStatus]: - with self._download_lock: - if download_id not in self.active_downloads: - return None + async def get_all_downloads(self) -> List[DownloadStatus]: + if self._engine is None: + return [] + return [ + self._record_to_status(record) + for record in self._engine.iter_records_for_source('tidal') + ] - info = self.active_downloads[download_id] - return DownloadStatus( - id=info['id'], - filename=info['filename'], - username=info['username'], - state=info['state'], - progress=info['progress'], - size=info['size'], - transferred=info['transferred'], - speed=info['speed'], - time_remaining=info.get('time_remaining'), - file_path=info.get('file_path'), - ) + async def get_download_status(self, download_id: str) -> Optional[DownloadStatus]: + if self._engine is None: + return None + record = self._engine.get_record('tidal', download_id) + return self._record_to_status(record) if record is not None else None async def cancel_download(self, download_id: str, username: str = None, remove: bool = False) -> bool: - try: - with self._download_lock: - if download_id not in self.active_downloads: - logger.warning(f"Download {download_id} not found") - return False - - self.active_downloads[download_id]['state'] = 'Cancelled' - logger.info(f"Marked Tidal download {download_id} as cancelled") - - if remove: - del self.active_downloads[download_id] - logger.info(f"Removed Tidal download {download_id} from queue") - - return True - except Exception as e: - logger.error(f"Failed to cancel download {download_id}: {e}") + if self._engine is None: + return False + if self._engine.get_record('tidal', download_id) is None: + logger.warning(f"Tidal download {download_id} not found") return False + self._engine.update_record('tidal', download_id, {'state': 'Cancelled'}) + logger.info(f"Marked Tidal download {download_id} as cancelled") + if remove: + self._engine.remove_record('tidal', download_id) + logger.info(f"Removed Tidal download {download_id} from queue") + return True async def clear_all_completed_downloads(self) -> bool: + if self._engine is None: + return True try: - with self._download_lock: - ids_to_remove = [ - did for did, info in self.active_downloads.items() - if info.get('state', '') in ('Completed, Succeeded', 'Cancelled', 'Errored', 'Aborted') - ] - for did in ids_to_remove: - del self.active_downloads[did] - + terminal = {'Completed, Succeeded', 'Cancelled', 'Errored', 'Aborted'} + for record in list(self._engine.iter_records_for_source('tidal')): + if record.get('state') in terminal: + self._engine.remove_record('tidal', record['id']) return True except Exception as e: logger.error(f"Error clearing downloads: {e}") diff --git a/tests/downloads/test_tidal_pinning.py b/tests/downloads/test_tidal_pinning.py index c84b42e8..405ee21b 100644 --- a/tests/downloads/test_tidal_pinning.py +++ b/tests/downloads/test_tidal_pinning.py @@ -1,11 +1,9 @@ -"""Phase A pinning tests for TidalDownloadClient's download lifecycle. - -Tidal authenticates via tidalapi OAuth, fetches HLS manifests for a -track_id, demuxes the FLAC stream from MP4 container with ffmpeg, -and writes the result to disk. The thread worker + state-dict -pattern is identical to YouTube's — Phase C will lift both into -the engine. These tests pin the SHAPE of the per-download record -and the filename encoding so the lift can't drift the contract. +"""Phase A pinning tests for TidalDownloadClient — UPDATED for Phase C3. + +Post-C3 the client no longer owns its own ``active_downloads`` dict +or thread spawn — both moved into the engine's BackgroundDownloadWorker. +Pinning tests now read state from ``engine.get_record('tidal', ...)`` +instead of ``client.active_downloads[...]``. """ from __future__ import annotations @@ -17,7 +15,7 @@ from unittest.mock import patch import pytest -# tidalapi may not be importable; tidal_download_client guards for that. +from core.download_engine import DownloadEngine from core.tidal_download_client import TidalDownloadClient @@ -30,19 +28,18 @@ def _run_async(coro): @pytest.fixture -def tidal_client(): - """A bare TidalDownloadClient — bypasses tidalapi.Session init. - Tests that need an authenticated state set client.session.check_login - via mock.""" +def tidal_client_with_engine(): client = TidalDownloadClient.__new__(TidalDownloadClient) client.download_path = Path('./test_tidal_downloads') client.shutdown_check = None client.session = None - client.active_downloads = {} - client._download_lock = threading.Lock() client._device_auth_future = None client._device_auth_link = None - return client + client._engine = None + + engine = DownloadEngine() + client.set_engine(engine) + return client, engine # --------------------------------------------------------------------------- @@ -50,21 +47,18 @@ def tidal_client(): # --------------------------------------------------------------------------- -def test_is_authenticated_false_when_no_session(tidal_client): - """Pinning: no session → not authenticated. Used by orchestrator - fallback to skip Tidal when user hasn't logged in.""" - assert tidal_client.is_authenticated() is False +def test_is_authenticated_false_when_no_session(tidal_client_with_engine): + client, _ = tidal_client_with_engine + assert client.is_authenticated() is False -def test_is_authenticated_false_when_session_check_login_raises(tidal_client): - """Pinning: tidalapi.Session.check_login() can raise on expired - tokens. Client swallows + reports False — orchestrator skip - behavior depends on this.""" +def test_is_authenticated_false_when_session_check_login_raises(tidal_client_with_engine): + client, _ = tidal_client_with_engine fake_session = type('FakeSession', (), { 'check_login': lambda self: (_ for _ in ()).throw(RuntimeError("expired")), })() - tidal_client.session = fake_session - assert tidal_client.is_authenticated() is False + client.session = fake_session + assert client.is_authenticated() is False # --------------------------------------------------------------------------- @@ -72,102 +66,102 @@ def test_is_authenticated_false_when_session_check_login_raises(tidal_client): # --------------------------------------------------------------------------- -def test_download_returns_none_for_invalid_filename_format(tidal_client): - """Pinning: Tidal encodes search results as `track_id||display`. - Missing `||` → None (not exception).""" - result = _run_async(tidal_client.download('tidal', 'no-separator', 0)) +def test_download_returns_none_for_invalid_filename_format(tidal_client_with_engine): + client, _ = tidal_client_with_engine + result = _run_async(client.download('tidal', 'no-separator', 0)) assert result is None -def test_download_returns_none_for_non_integer_track_id(tidal_client): - """Pinning: track_id portion MUST parse as int. Tidal API uses - integer track IDs. Non-int → None (not exception).""" - result = _run_async(tidal_client.download('tidal', 'not-a-number||some title', 0)) +def test_download_returns_none_for_non_integer_track_id(tidal_client_with_engine): + client, _ = tidal_client_with_engine + result = _run_async(client.download('tidal', 'not-int||title', 0)) assert result is None -def test_download_returns_uuid_for_valid_filename(tidal_client): - """Pinning: valid `||display` filename returns a UUID - download_id immediately; download runs in background thread.""" - with patch('core.tidal_download_client.threading.Thread') as fake_thread_cls: - fake_thread_cls.return_value.start = lambda: None - result = _run_async(tidal_client.download('tidal', '12345||Some Song', 0)) +def test_download_returns_none_when_engine_not_wired(): + client = TidalDownloadClient.__new__(TidalDownloadClient) + client._engine = None + result = _run_async(client.download('tidal', '12345||x', 0)) + assert result is None + +def test_download_returns_uuid_for_valid_filename(tidal_client_with_engine): + client, _ = tidal_client_with_engine + with patch.object(client, '_download_sync', return_value='/tmp/x.flac'): + result = _run_async(client.download('tidal', '12345||Some Song', 0)) assert result is not None - assert len(result) == 36 # UUID4 format - - -def test_download_populates_active_downloads_with_initial_state(tidal_client): - """Pinning: per-download record schema. Engine refactor moves - this dict into central state but the SHAPE must stay the same - for status APIs / frontend / post-processing consumers.""" - with patch('core.tidal_download_client.threading.Thread') as fake_thread_cls: - fake_thread_cls.return_value.start = lambda: None - download_id = _run_async( - tidal_client.download('tidal', '999||My Tidal Song', 0) - ) - - record = tidal_client.active_downloads[download_id] - assert record['id'] == download_id - assert record['filename'] == '999||My Tidal Song' # ORIGINAL encoded form - assert record['username'] == 'tidal' - assert record['state'] == 'Initializing' - assert record['progress'] == 0.0 - assert record['size'] == 0 # filled in by worker once HLS manifest fetched - assert record['track_id'] == 999 # parsed as int - assert record['display_name'] == 'My Tidal Song' - assert record['file_path'] is None - - -def test_download_spawns_daemon_thread_targeting_worker(tidal_client): - """Pinning: daemon thread targeting `_download_thread_worker` - with (download_id, track_id, display_name, original_filename). - Phase C replaces this with `engine.dispatch_download(plugin, ...)` - that calls `plugin._download_impl(track_id)`.""" - captured_kwargs = {} - - def capture_thread(*args, **kwargs): - captured_kwargs.update(kwargs) - return type('FakeThread', (), {'start': lambda self: None})() - - with patch('core.tidal_download_client.threading.Thread', side_effect=capture_thread): - _run_async(tidal_client.download('tidal', '777||Title', 0)) - - assert captured_kwargs.get('daemon') is True - assert captured_kwargs.get('target') == tidal_client._download_thread_worker - args = captured_kwargs.get('args', ()) - assert len(args) == 4 - # Args: (download_id, track_id, display_name, original_filename) - assert args[1] == 777 # track_id parsed as int - assert args[2] == 'Title' - assert args[3] == '777||Title' # original encoded filename + assert len(result) == 36 + + +def test_download_populates_engine_record_with_initial_state(tidal_client_with_engine): + client, engine = tidal_client_with_engine + started = threading.Event() + release = threading.Event() + + def slow_impl(*args, **kwargs): + started.set() + release.wait(timeout=1.0) + return '/tmp/done.flac' + + with patch.object(client, '_download_sync', side_effect=slow_impl): + download_id = _run_async(client.download('tidal', '999||My Tidal Song', 0)) + started.wait(timeout=1.0) + record = engine.get_record('tidal', download_id) + + assert record is not None + assert record['id'] == download_id + assert record['filename'] == '999||My Tidal Song' + assert record['username'] == 'tidal' + assert record['state'] in ('Initializing', 'InProgress, Downloading') + assert record['progress'] == 0.0 + assert record['track_id'] == 999 # parsed as int + assert record['display_name'] == 'My Tidal Song' + assert record['file_path'] is None + release.set() # --------------------------------------------------------------------------- -# get_all_downloads() +# Query / cancel — engine-backed reads # --------------------------------------------------------------------------- -def test_get_all_downloads_iterates_active_downloads(tidal_client): - """Pinning: returns one DownloadStatus per entry in - active_downloads. Engine refactor will replace this with a - central query — the per-record-to-DownloadStatus translation - must preserve the field mapping.""" - tidal_client.active_downloads = { - 'dl-1': { - 'id': 'dl-1', 'filename': '111||Song A', 'username': 'tidal', - 'state': 'InProgress, Downloading', 'progress': 50.0, - 'size': 1000, 'transferred': 500, 'speed': 100, - 'time_remaining': None, - }, - 'dl-2': { - 'id': 'dl-2', 'filename': '222||Song B', 'username': 'tidal', - 'state': 'Completed, Succeeded', 'progress': 100.0, - 'size': 2000, 'transferred': 2000, 'speed': 0, - 'time_remaining': None, - }, - } - result = _run_async(tidal_client.get_all_downloads()) +def test_get_all_downloads_reads_engine_records(tidal_client_with_engine): + client, engine = tidal_client_with_engine + engine.add_record('tidal', 'dl-1', { + 'id': 'dl-1', 'filename': '111||Song A', 'username': 'tidal', + 'state': 'InProgress, Downloading', 'progress': 50.0, + 'size': 1000, 'transferred': 500, 'speed': 100, + }) + engine.add_record('tidal', 'dl-2', { + 'id': 'dl-2', 'filename': '222||Song B', 'username': 'tidal', + 'state': 'Completed, Succeeded', 'progress': 100.0, + 'size': 2000, 'transferred': 2000, 'speed': 0, + }) + result = _run_async(client.get_all_downloads()) assert len(result) == 2 assert {r.id for r in result} == {'dl-1', 'dl-2'} assert {r.username for r in result} == {'tidal'} + + +def test_cancel_download_marks_cancelled(tidal_client_with_engine): + client, engine = tidal_client_with_engine + engine.add_record('tidal', 'dl-1', {'id': 'dl-1', 'state': 'InProgress, Downloading'}) + + ok = _run_async(client.cancel_download('dl-1', None, remove=False)) + assert ok is True + assert engine.get_record('tidal', 'dl-1')['state'] == 'Cancelled' + + ok = _run_async(client.cancel_download('dl-1', None, remove=True)) + assert ok is True + assert engine.get_record('tidal', 'dl-1') is None + + +def test_clear_all_completed_drops_only_terminal_records(tidal_client_with_engine): + client, engine = tidal_client_with_engine + engine.add_record('tidal', 'done', {'id': 'done', 'state': 'Completed, Succeeded'}) + engine.add_record('tidal', 'live', {'id': 'live', 'state': 'InProgress, Downloading'}) + + _run_async(client.clear_all_completed_downloads()) + + assert engine.get_record('tidal', 'done') is None + assert engine.get_record('tidal', 'live') is not None