C3: Migrate Tidal to engine.worker

Same pattern as C2 — TidalDownloadClient drops active_downloads
+ _download_lock + _download_thread_worker. download() delegates
to engine.worker.dispatch with _download_sync as the impl.
Source-specific extras (track_id, display_name) merge into the
engine record.

The HLS-segment progress callback (_update_download_progress)
now writes to engine state via engine.update_record instead of
mutating the per-client dict in-place.

Query/cancel methods (get_all_downloads, get_download_status,
cancel_download, clear_all_completed_downloads) now read engine
state via the same accessors as the YouTube migration.

Pinning tests updated to assert engine state. Suite still green
(313 download tests). Behavior preserved end-to-end.
pull/495/head
Broque Thomas 3 weeks ago
parent 4ddfb01a0a
commit 73fb60a68a

@ -116,12 +116,21 @@ class TidalDownloadClient:
self.session: Optional['tidalapi.Session'] = None
self._init_session()
self.active_downloads: Dict[str, Dict[str, Any]] = {}
self._download_lock = threading.Lock()
self._device_auth_future = None
self._device_auth_link = None
# Engine reference is populated by set_engine() at registration
# time. Until then dispatch returns None — orchestrator wires
# this immediately so the only None case is tests that bypass
# the orchestrator.
self._engine = None
def set_engine(self, engine):
"""Engine callback — gives the client access to the central
thread worker + state store. Engine calls this during
``register_plugin`` if the plugin defines it."""
self._engine = engine
def set_shutdown_check(self, check_callable):
self.shutdown_check = check_callable
@ -604,85 +613,33 @@ class TidalDownloadClient:
) from exc
async def download(self, username: str, filename: str, file_size: int = 0) -> Optional[str]:
try:
if '||' not in filename:
logger.error(f"Invalid filename format: {filename}")
return None
track_id_str, display_name = filename.split('||', 1)
try:
track_id = int(track_id_str)
except ValueError:
logger.error(f"Invalid Tidal track ID: {track_id_str}")
return None
logger.info(f"Starting Tidal download: {display_name}")
download_id = str(uuid.uuid4())
with self._download_lock:
self.active_downloads[download_id] = {
'id': download_id,
'filename': filename,
'username': 'tidal',
'state': 'Initializing',
'progress': 0.0,
'size': 0,
'transferred': 0,
'speed': 0,
'time_remaining': None,
'track_id': track_id,
'display_name': display_name,
'file_path': None,
}
download_thread = threading.Thread(
target=self._download_thread_worker,
args=(download_id, track_id, display_name, filename),
daemon=True,
)
download_thread.start()
logger.info(f"Tidal download {download_id} started in background")
return download_id
except Exception as e:
logger.error(f"Failed to start Tidal download: {e}")
import traceback
traceback.print_exc()
if '||' not in filename:
logger.error(f"Invalid filename format: {filename}")
return None
if self._engine is None:
logger.error("Tidal client has no engine reference — cannot dispatch download")
return None
def _download_thread_worker(self, download_id: str, track_id: int, display_name: str, original_filename: str):
track_id_str, display_name = filename.split('||', 1)
try:
with self._download_lock:
if download_id in self.active_downloads:
self.active_downloads[download_id]['state'] = 'InProgress, Downloading'
file_path = self._download_sync(download_id, track_id, display_name)
if file_path:
with self._download_lock:
if download_id in self.active_downloads:
self.active_downloads[download_id]['state'] = 'Completed, Succeeded'
self.active_downloads[download_id]['progress'] = 100.0
self.active_downloads[download_id]['file_path'] = file_path
logger.info(f"Tidal download {download_id} completed: {file_path}")
else:
with self._download_lock:
if download_id in self.active_downloads:
self.active_downloads[download_id]['state'] = 'Errored'
logger.error(f"Tidal download {download_id} failed")
except Exception as e:
logger.error(f"Tidal download thread failed for {download_id}: {e}")
import traceback
traceback.print_exc()
track_id = int(track_id_str)
except ValueError:
logger.error(f"Invalid Tidal track ID: {track_id_str}")
return None
with self._download_lock:
if download_id in self.active_downloads:
self.active_downloads[download_id]['state'] = 'Errored'
logger.info(f"Starting Tidal download: {display_name}")
return self._engine.worker.dispatch(
source_name='tidal',
target_id=track_id,
display_name=display_name,
original_filename=filename,
impl_callable=self._download_sync,
extra_record_fields={
'track_id': track_id,
'display_name': display_name,
},
)
def _download_sync(self, download_id: str, track_id: int, display_name: str) -> Optional[str]:
if not self.session or not self.session.check_login():
@ -727,9 +684,8 @@ class TidalDownloadClient:
speed_start = time.time()
segments_completed = 0
with self._download_lock:
if download_id in self.active_downloads:
self.active_downloads[download_id]['size'] = 0
if self._engine is not None:
self._engine.update_record('tidal', download_id, {'size': 0})
with intermediate_path.open('wb') as output_file:
if init_uri:
@ -828,97 +784,82 @@ class TidalDownloadClient:
def _update_download_progress(self, download_id: str, downloaded: int,
segments_completed: int, total_segments: int,
speed_start: float):
with self._download_lock:
if download_id not in self.active_downloads:
return
info = self.active_downloads[download_id]
info['transferred'] = downloaded
now = time.time()
elapsed_total = now - speed_start
speed = int(downloaded / elapsed_total) if elapsed_total > 0 else 0
info['speed'] = speed
if total_segments > 0:
progress = (segments_completed / total_segments) * 100
info['progress'] = round(min(progress, 99.9), 1)
time_remaining = None
if speed > 0:
remaining_bytes = downloaded * (total_segments / max(segments_completed, 1)) - downloaded
if remaining_bytes > 0:
time_remaining = int(remaining_bytes / speed)
info['time_remaining'] = time_remaining
if self._engine is None:
return
record = self._engine.get_record('tidal', download_id)
if record is None:
return
async def get_all_downloads(self) -> List[DownloadStatus]:
download_statuses = []
with self._download_lock:
for _download_id, info in self.active_downloads.items():
status = DownloadStatus(
id=info['id'],
filename=info['filename'],
username=info['username'],
state=info['state'],
progress=info['progress'],
size=info['size'],
transferred=info['transferred'],
speed=info['speed'],
time_remaining=info.get('time_remaining'),
file_path=info.get('file_path'),
)
download_statuses.append(status)
now = time.time()
elapsed_total = now - speed_start
speed = int(downloaded / elapsed_total) if elapsed_total > 0 else 0
progress = record.get('progress', 0.0)
if total_segments > 0:
progress = round(min((segments_completed / total_segments) * 100, 99.9), 1)
time_remaining = None
if speed > 0:
remaining_bytes = downloaded * (total_segments / max(segments_completed, 1)) - downloaded
if remaining_bytes > 0:
time_remaining = int(remaining_bytes / speed)
self._engine.update_record('tidal', download_id, {
'transferred': downloaded,
'speed': speed,
'progress': progress,
'time_remaining': time_remaining,
})
return download_statuses
def _record_to_status(self, record):
return DownloadStatus(
id=record['id'],
filename=record['filename'],
username=record['username'],
state=record['state'],
progress=record['progress'],
size=record.get('size', 0),
transferred=record.get('transferred', 0),
speed=record.get('speed', 0),
time_remaining=record.get('time_remaining'),
file_path=record.get('file_path'),
)
async def get_download_status(self, download_id: str) -> Optional[DownloadStatus]:
with self._download_lock:
if download_id not in self.active_downloads:
return None
async def get_all_downloads(self) -> List[DownloadStatus]:
if self._engine is None:
return []
return [
self._record_to_status(record)
for record in self._engine.iter_records_for_source('tidal')
]
info = self.active_downloads[download_id]
return DownloadStatus(
id=info['id'],
filename=info['filename'],
username=info['username'],
state=info['state'],
progress=info['progress'],
size=info['size'],
transferred=info['transferred'],
speed=info['speed'],
time_remaining=info.get('time_remaining'),
file_path=info.get('file_path'),
)
async def get_download_status(self, download_id: str) -> Optional[DownloadStatus]:
if self._engine is None:
return None
record = self._engine.get_record('tidal', download_id)
return self._record_to_status(record) if record is not None else None
async def cancel_download(self, download_id: str, username: str = None, remove: bool = False) -> bool:
try:
with self._download_lock:
if download_id not in self.active_downloads:
logger.warning(f"Download {download_id} not found")
return False
self.active_downloads[download_id]['state'] = 'Cancelled'
logger.info(f"Marked Tidal download {download_id} as cancelled")
if remove:
del self.active_downloads[download_id]
logger.info(f"Removed Tidal download {download_id} from queue")
return True
except Exception as e:
logger.error(f"Failed to cancel download {download_id}: {e}")
if self._engine is None:
return False
if self._engine.get_record('tidal', download_id) is None:
logger.warning(f"Tidal download {download_id} not found")
return False
self._engine.update_record('tidal', download_id, {'state': 'Cancelled'})
logger.info(f"Marked Tidal download {download_id} as cancelled")
if remove:
self._engine.remove_record('tidal', download_id)
logger.info(f"Removed Tidal download {download_id} from queue")
return True
async def clear_all_completed_downloads(self) -> bool:
if self._engine is None:
return True
try:
with self._download_lock:
ids_to_remove = [
did for did, info in self.active_downloads.items()
if info.get('state', '') in ('Completed, Succeeded', 'Cancelled', 'Errored', 'Aborted')
]
for did in ids_to_remove:
del self.active_downloads[did]
terminal = {'Completed, Succeeded', 'Cancelled', 'Errored', 'Aborted'}
for record in list(self._engine.iter_records_for_source('tidal')):
if record.get('state') in terminal:
self._engine.remove_record('tidal', record['id'])
return True
except Exception as e:
logger.error(f"Error clearing downloads: {e}")

@ -1,11 +1,9 @@
"""Phase A pinning tests for TidalDownloadClient's download lifecycle.
Tidal authenticates via tidalapi OAuth, fetches HLS manifests for a
track_id, demuxes the FLAC stream from MP4 container with ffmpeg,
and writes the result to disk. The thread worker + state-dict
pattern is identical to YouTube's — Phase C will lift both into
the engine. These tests pin the SHAPE of the per-download record
and the filename encoding so the lift can't drift the contract.
"""Phase A pinning tests for TidalDownloadClient — UPDATED for Phase C3.
Post-C3 the client no longer owns its own ``active_downloads`` dict
or thread spawn both moved into the engine's BackgroundDownloadWorker.
Pinning tests now read state from ``engine.get_record('tidal', ...)``
instead of ``client.active_downloads[...]``.
"""
from __future__ import annotations
@ -17,7 +15,7 @@ from unittest.mock import patch
import pytest
# tidalapi may not be importable; tidal_download_client guards for that.
from core.download_engine import DownloadEngine
from core.tidal_download_client import TidalDownloadClient
@ -30,19 +28,18 @@ def _run_async(coro):
@pytest.fixture
def tidal_client():
"""A bare TidalDownloadClient — bypasses tidalapi.Session init.
Tests that need an authenticated state set client.session.check_login
via mock."""
def tidal_client_with_engine():
client = TidalDownloadClient.__new__(TidalDownloadClient)
client.download_path = Path('./test_tidal_downloads')
client.shutdown_check = None
client.session = None
client.active_downloads = {}
client._download_lock = threading.Lock()
client._device_auth_future = None
client._device_auth_link = None
return client
client._engine = None
engine = DownloadEngine()
client.set_engine(engine)
return client, engine
# ---------------------------------------------------------------------------
@ -50,21 +47,18 @@ def tidal_client():
# ---------------------------------------------------------------------------
def test_is_authenticated_false_when_no_session(tidal_client):
"""Pinning: no session → not authenticated. Used by orchestrator
fallback to skip Tidal when user hasn't logged in."""
assert tidal_client.is_authenticated() is False
def test_is_authenticated_false_when_no_session(tidal_client_with_engine):
client, _ = tidal_client_with_engine
assert client.is_authenticated() is False
def test_is_authenticated_false_when_session_check_login_raises(tidal_client):
"""Pinning: tidalapi.Session.check_login() can raise on expired
tokens. Client swallows + reports False orchestrator skip
behavior depends on this."""
def test_is_authenticated_false_when_session_check_login_raises(tidal_client_with_engine):
client, _ = tidal_client_with_engine
fake_session = type('FakeSession', (), {
'check_login': lambda self: (_ for _ in ()).throw(RuntimeError("expired")),
})()
tidal_client.session = fake_session
assert tidal_client.is_authenticated() is False
client.session = fake_session
assert client.is_authenticated() is False
# ---------------------------------------------------------------------------
@ -72,102 +66,102 @@ def test_is_authenticated_false_when_session_check_login_raises(tidal_client):
# ---------------------------------------------------------------------------
def test_download_returns_none_for_invalid_filename_format(tidal_client):
"""Pinning: Tidal encodes search results as `track_id||display`.
Missing `||` None (not exception)."""
result = _run_async(tidal_client.download('tidal', 'no-separator', 0))
def test_download_returns_none_for_invalid_filename_format(tidal_client_with_engine):
client, _ = tidal_client_with_engine
result = _run_async(client.download('tidal', 'no-separator', 0))
assert result is None
def test_download_returns_none_for_non_integer_track_id(tidal_client):
"""Pinning: track_id portion MUST parse as int. Tidal API uses
integer track IDs. Non-int None (not exception)."""
result = _run_async(tidal_client.download('tidal', 'not-a-number||some title', 0))
def test_download_returns_none_for_non_integer_track_id(tidal_client_with_engine):
client, _ = tidal_client_with_engine
result = _run_async(client.download('tidal', 'not-int||title', 0))
assert result is None
def test_download_returns_uuid_for_valid_filename(tidal_client):
"""Pinning: valid `<int>||display` filename returns a UUID
download_id immediately; download runs in background thread."""
with patch('core.tidal_download_client.threading.Thread') as fake_thread_cls:
fake_thread_cls.return_value.start = lambda: None
result = _run_async(tidal_client.download('tidal', '12345||Some Song', 0))
def test_download_returns_none_when_engine_not_wired():
client = TidalDownloadClient.__new__(TidalDownloadClient)
client._engine = None
result = _run_async(client.download('tidal', '12345||x', 0))
assert result is None
def test_download_returns_uuid_for_valid_filename(tidal_client_with_engine):
client, _ = tidal_client_with_engine
with patch.object(client, '_download_sync', return_value='/tmp/x.flac'):
result = _run_async(client.download('tidal', '12345||Some Song', 0))
assert result is not None
assert len(result) == 36 # UUID4 format
def test_download_populates_active_downloads_with_initial_state(tidal_client):
"""Pinning: per-download record schema. Engine refactor moves
this dict into central state but the SHAPE must stay the same
for status APIs / frontend / post-processing consumers."""
with patch('core.tidal_download_client.threading.Thread') as fake_thread_cls:
fake_thread_cls.return_value.start = lambda: None
download_id = _run_async(
tidal_client.download('tidal', '999||My Tidal Song', 0)
)
record = tidal_client.active_downloads[download_id]
assert record['id'] == download_id
assert record['filename'] == '999||My Tidal Song' # ORIGINAL encoded form
assert record['username'] == 'tidal'
assert record['state'] == 'Initializing'
assert record['progress'] == 0.0
assert record['size'] == 0 # filled in by worker once HLS manifest fetched
assert record['track_id'] == 999 # parsed as int
assert record['display_name'] == 'My Tidal Song'
assert record['file_path'] is None
def test_download_spawns_daemon_thread_targeting_worker(tidal_client):
"""Pinning: daemon thread targeting `_download_thread_worker`
with (download_id, track_id, display_name, original_filename).
Phase C replaces this with `engine.dispatch_download(plugin, ...)`
that calls `plugin._download_impl(track_id)`."""
captured_kwargs = {}
def capture_thread(*args, **kwargs):
captured_kwargs.update(kwargs)
return type('FakeThread', (), {'start': lambda self: None})()
with patch('core.tidal_download_client.threading.Thread', side_effect=capture_thread):
_run_async(tidal_client.download('tidal', '777||Title', 0))
assert captured_kwargs.get('daemon') is True
assert captured_kwargs.get('target') == tidal_client._download_thread_worker
args = captured_kwargs.get('args', ())
assert len(args) == 4
# Args: (download_id, track_id, display_name, original_filename)
assert args[1] == 777 # track_id parsed as int
assert args[2] == 'Title'
assert args[3] == '777||Title' # original encoded filename
assert len(result) == 36
def test_download_populates_engine_record_with_initial_state(tidal_client_with_engine):
client, engine = tidal_client_with_engine
started = threading.Event()
release = threading.Event()
def slow_impl(*args, **kwargs):
started.set()
release.wait(timeout=1.0)
return '/tmp/done.flac'
with patch.object(client, '_download_sync', side_effect=slow_impl):
download_id = _run_async(client.download('tidal', '999||My Tidal Song', 0))
started.wait(timeout=1.0)
record = engine.get_record('tidal', download_id)
assert record is not None
assert record['id'] == download_id
assert record['filename'] == '999||My Tidal Song'
assert record['username'] == 'tidal'
assert record['state'] in ('Initializing', 'InProgress, Downloading')
assert record['progress'] == 0.0
assert record['track_id'] == 999 # parsed as int
assert record['display_name'] == 'My Tidal Song'
assert record['file_path'] is None
release.set()
# ---------------------------------------------------------------------------
# get_all_downloads()
# Query / cancel — engine-backed reads
# ---------------------------------------------------------------------------
def test_get_all_downloads_iterates_active_downloads(tidal_client):
"""Pinning: returns one DownloadStatus per entry in
active_downloads. Engine refactor will replace this with a
central query the per-record-to-DownloadStatus translation
must preserve the field mapping."""
tidal_client.active_downloads = {
'dl-1': {
'id': 'dl-1', 'filename': '111||Song A', 'username': 'tidal',
'state': 'InProgress, Downloading', 'progress': 50.0,
'size': 1000, 'transferred': 500, 'speed': 100,
'time_remaining': None,
},
'dl-2': {
'id': 'dl-2', 'filename': '222||Song B', 'username': 'tidal',
'state': 'Completed, Succeeded', 'progress': 100.0,
'size': 2000, 'transferred': 2000, 'speed': 0,
'time_remaining': None,
},
}
result = _run_async(tidal_client.get_all_downloads())
def test_get_all_downloads_reads_engine_records(tidal_client_with_engine):
client, engine = tidal_client_with_engine
engine.add_record('tidal', 'dl-1', {
'id': 'dl-1', 'filename': '111||Song A', 'username': 'tidal',
'state': 'InProgress, Downloading', 'progress': 50.0,
'size': 1000, 'transferred': 500, 'speed': 100,
})
engine.add_record('tidal', 'dl-2', {
'id': 'dl-2', 'filename': '222||Song B', 'username': 'tidal',
'state': 'Completed, Succeeded', 'progress': 100.0,
'size': 2000, 'transferred': 2000, 'speed': 0,
})
result = _run_async(client.get_all_downloads())
assert len(result) == 2
assert {r.id for r in result} == {'dl-1', 'dl-2'}
assert {r.username for r in result} == {'tidal'}
def test_cancel_download_marks_cancelled(tidal_client_with_engine):
client, engine = tidal_client_with_engine
engine.add_record('tidal', 'dl-1', {'id': 'dl-1', 'state': 'InProgress, Downloading'})
ok = _run_async(client.cancel_download('dl-1', None, remove=False))
assert ok is True
assert engine.get_record('tidal', 'dl-1')['state'] == 'Cancelled'
ok = _run_async(client.cancel_download('dl-1', None, remove=True))
assert ok is True
assert engine.get_record('tidal', 'dl-1') is None
def test_clear_all_completed_drops_only_terminal_records(tidal_client_with_engine):
client, engine = tidal_client_with_engine
engine.add_record('tidal', 'done', {'id': 'done', 'state': 'Completed, Succeeded'})
engine.add_record('tidal', 'live', {'id': 'live', 'state': 'InProgress, Downloading'})
_run_async(client.clear_all_completed_downloads())
assert engine.get_record('tidal', 'done') is None
assert engine.get_record('tidal', 'live') is not None

Loading…
Cancel
Save