mirror of https://github.com/Nezreka/SoulSync.git
Add Qobuz playlist sync to Sync page (#677)
Qobuz joins Tidal and Deezer as a first-class playlist sync source. New Qobuz tab on the Sync page lists user playlists + a virtual Favorite Tracks entry, and clicks route through the same discovery → sync → download pipeline the other services already use. Backend: * core/qobuz_client.py — new get_user_playlists, get_playlist, get_user_favorite_tracks, get_user_favorite_tracks_count. Returns normalized dicts (matches Deezer client shape, not Tidal's dataclasses) so the discovery worker can iterate directly without duck-typing. Virtual `qobuz-favorites` ID dispatches to favorites fetcher inside get_playlist — same trick Tidal uses with COLLECTION_PLAYLIST_ID. Both list endpoints paginate against Qobuz's 500-cap limit. * core/discovery/qobuz.py — new worker module. Mirrors core/discovery/deezer.py: pause enrichment, iterate tracks, hit discovery cache, fall back to _search_spotify_for_tidal_track, build wing-it stub on miss, sync results to mirrored playlist. * web_server.py — adds /api/qobuz/playlists, /playlist/<id>, /discovery/start/<id>, /discovery/status/<id>, /discovery/update_match, /playlists/states, /state/<id>, /reset/<id>, /delete/<id>, /update_phase/<id>, /sync/start/<id>, /sync/status/<id>, /sync/cancel/<id>. One-for-one with the Tidal + Deezer endpoint sets. Qobuz discovery executor registered for clean shutdown. Frontend: * webui/static/sync-services.js — full handler set (loadQobuzPlaylists, createQobuzCard, openQobuzDiscoveryModal, startQobuzDiscoveryPolling, startQobuzPlaylistSync, startQobuzSyncPolling, cancelQobuzSync, startQobuzDownloadMissing, rehydrateQobuzDownloadModal, etc.). Reuses the shared YouTube discovery modal via fake `qobuz_<id>` urlHash and is_qobuz_playlist flag. Shared switch statements in getModalActionButtons / generateTableRowsFromState / Wing It helpers in downloads.js gain new isQobuz branches alongside the existing per-service ones. * webui/index.html — new Qobuz tab button + content div, slotted between Deezer and Deezer Link. * webui/static/style.css — new .qobuz-icon for the tab icon. * webui/static/core.js — qobuzPlaylists / qobuzPlaylistStates / qobuzPlaylistsLoaded globals. Followed the existing per-service pattern verbatim rather than refactoring the duplicated transformers across Tidal / Deezer / Spotify-public / YouTube / Mirrored — that refactor is its own follow-up PR per the "don't break Tidal/Deezer" scope discipline. Adding the 6th copy of a proven pattern is lower risk than collapsing 5 working services behind a new abstraction. Tests: * tests/test_qobuz_playlists.py — 12 tests covering pagination, normalization, favorites virtual-ID routing, artist-name fallback chain (performer → album.artist → 'Unknown Artist'), and unauthenticated short-circuits.pull/685/head
parent
eba7f61e04
commit
a34eae1445
@ -0,0 +1,299 @@
|
||||
"""Background worker for Qobuz playlist discovery.
|
||||
|
||||
`run_qobuz_discovery_worker(playlist_id, deps)` is the function the
|
||||
Qobuz discovery start-endpoint submits to its executor to match each
|
||||
Qobuz playlist track against Spotify (preferred) or the configured
|
||||
fallback metadata source (iTunes / Deezer / Discogs / MusicBrainz).
|
||||
|
||||
Mirrors `core/discovery/deezer.py` exactly — Qobuz playlists arrive as
|
||||
dicts (not dataclasses) from `core/qobuz_client.py:get_playlist`, so
|
||||
this worker uses dict-style access on track data and wraps each entry
|
||||
in a SimpleNamespace before handing it to the shared
|
||||
`_search_spotify_for_tidal_track` helper.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import time
|
||||
import types
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Callable
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class QobuzDiscoveryDeps:
|
||||
"""Bundle of cross-cutting deps the Qobuz discovery worker needs."""
|
||||
qobuz_discovery_states: dict
|
||||
spotify_client: Any
|
||||
pause_enrichment_workers: Callable[[str], dict]
|
||||
resume_enrichment_workers: Callable[[dict, str], None]
|
||||
get_active_discovery_source: Callable[[], str]
|
||||
get_metadata_fallback_client: Callable[[], Any]
|
||||
get_discovery_cache_key: Callable
|
||||
get_database: Callable[[], Any]
|
||||
validate_discovery_cache_artist: Callable
|
||||
search_spotify_for_tidal_track: Callable
|
||||
build_discovery_wing_it_stub: Callable
|
||||
add_activity_item: Callable
|
||||
sync_discovery_results_to_mirrored: Callable
|
||||
|
||||
|
||||
def run_qobuz_discovery_worker(playlist_id, deps: QobuzDiscoveryDeps):
|
||||
"""Background worker for Qobuz discovery process (Spotify preferred, fallback metadata source)."""
|
||||
_ew_state = {}
|
||||
try:
|
||||
_ew_state = deps.pause_enrichment_workers('Qobuz discovery')
|
||||
state = deps.qobuz_discovery_states[playlist_id]
|
||||
playlist = state['playlist']
|
||||
|
||||
# Determine which provider to use
|
||||
discovery_source = deps.get_active_discovery_source()
|
||||
use_spotify = (discovery_source == 'spotify') and deps.spotify_client and deps.spotify_client.is_spotify_authenticated()
|
||||
|
||||
# Initialize fallback client if needed
|
||||
itunes_client_instance = None
|
||||
if not use_spotify:
|
||||
itunes_client_instance = deps.get_metadata_fallback_client()
|
||||
|
||||
logger.info(f"Starting Qobuz discovery for: {playlist['name']} (using {discovery_source.upper()})")
|
||||
|
||||
# Store discovery source in state for frontend
|
||||
state['discovery_source'] = discovery_source
|
||||
|
||||
successful_discoveries = 0
|
||||
tracks = playlist['tracks']
|
||||
|
||||
for i, qobuz_track in enumerate(tracks):
|
||||
if state.get('cancelled', False):
|
||||
break
|
||||
|
||||
try:
|
||||
track_name = qobuz_track['name']
|
||||
track_artists = qobuz_track['artists']
|
||||
track_id = qobuz_track['id']
|
||||
track_album = qobuz_track.get('album', '')
|
||||
track_duration_ms = qobuz_track.get('duration_ms', 0)
|
||||
|
||||
logger.info(f"[{i+1}/{len(tracks)}] Searching {discovery_source.upper()}: {track_name} by {', '.join(track_artists)}")
|
||||
|
||||
# Check discovery cache first
|
||||
cache_key = deps.get_discovery_cache_key(track_name, track_artists[0] if track_artists else '')
|
||||
try:
|
||||
cache_db = deps.get_database()
|
||||
cached_match = cache_db.get_discovery_cache_match(cache_key[0], cache_key[1], discovery_source)
|
||||
if cached_match and deps.validate_discovery_cache_artist(track_artists[0] if track_artists else '', cached_match):
|
||||
logger.debug(f"CACHE HIT [{i+1}/{len(tracks)}]: {track_name} by {', '.join(track_artists)}")
|
||||
cached_artists = cached_match.get('artists', [])
|
||||
if cached_artists:
|
||||
cached_artist_str = ', '.join(
|
||||
a if isinstance(a, str) else a.get('name', '') for a in cached_artists
|
||||
)
|
||||
else:
|
||||
cached_artist_str = ''
|
||||
cached_album = cached_match.get('album', '')
|
||||
if isinstance(cached_album, dict):
|
||||
cached_album = cached_album.get('name', '')
|
||||
|
||||
result = {
|
||||
'qobuz_track': {
|
||||
'id': track_id,
|
||||
'name': track_name,
|
||||
'artists': track_artists or [],
|
||||
'album': track_album,
|
||||
'duration_ms': track_duration_ms,
|
||||
},
|
||||
'spotify_data': cached_match,
|
||||
'match_data': cached_match,
|
||||
'status': 'Found',
|
||||
'status_class': 'found',
|
||||
'spotify_track': cached_match.get('name', ''),
|
||||
'spotify_artist': cached_artist_str,
|
||||
'spotify_album': cached_album,
|
||||
'spotify_id': cached_match.get('id', ''),
|
||||
'discovery_source': discovery_source,
|
||||
'index': i
|
||||
}
|
||||
successful_discoveries += 1
|
||||
state['spotify_matches'] = successful_discoveries
|
||||
state['discovery_results'].append(result)
|
||||
state['discovery_progress'] = int(((i + 1) / len(tracks)) * 100)
|
||||
continue
|
||||
except Exception as cache_err:
|
||||
logger.error(f"Cache lookup error: {cache_err}")
|
||||
|
||||
# SimpleNamespace duck-type for _search_spotify_for_tidal_track
|
||||
track_ns = types.SimpleNamespace(
|
||||
id=track_id,
|
||||
name=track_name,
|
||||
artists=track_artists,
|
||||
album=track_album,
|
||||
duration_ms=track_duration_ms
|
||||
)
|
||||
|
||||
track_result = deps.search_spotify_for_tidal_track(
|
||||
track_ns,
|
||||
use_spotify=use_spotify,
|
||||
itunes_client=itunes_client_instance
|
||||
)
|
||||
|
||||
result = {
|
||||
'qobuz_track': {
|
||||
'id': track_id,
|
||||
'name': track_name,
|
||||
'artists': track_artists or [],
|
||||
'album': track_album,
|
||||
'duration_ms': track_duration_ms,
|
||||
},
|
||||
'spotify_data': None,
|
||||
'match_data': None,
|
||||
'status': 'Not Found',
|
||||
'status_class': 'not-found',
|
||||
'spotify_track': '',
|
||||
'spotify_artist': '',
|
||||
'spotify_album': '',
|
||||
'discovery_source': discovery_source
|
||||
}
|
||||
|
||||
match_confidence = 0.0
|
||||
|
||||
if use_spotify and isinstance(track_result, tuple):
|
||||
track_obj, raw_track_data, match_confidence = track_result
|
||||
album_obj = raw_track_data.get('album', {}) if raw_track_data else {}
|
||||
if isinstance(album_obj, dict) and not album_obj.get('name') and track_obj.album:
|
||||
album_obj['name'] = track_obj.album
|
||||
elif not album_obj and track_obj.album:
|
||||
album_obj = {'name': track_obj.album}
|
||||
if isinstance(album_obj, dict) and not album_obj.get('release_date'):
|
||||
album_obj['release_date'] = getattr(track_obj, 'release_date', '') or ''
|
||||
_album_images = album_obj.get('images', []) if isinstance(album_obj, dict) else []
|
||||
_image_url = _album_images[0].get('url', '') if _album_images else (getattr(track_obj, 'image_url', '') or '')
|
||||
|
||||
match_data = {
|
||||
'id': track_obj.id,
|
||||
'name': track_obj.name,
|
||||
'artists': track_obj.artists,
|
||||
'album': album_obj,
|
||||
'duration_ms': track_obj.duration_ms,
|
||||
'external_urls': track_obj.external_urls,
|
||||
'image_url': _image_url,
|
||||
'source': 'spotify'
|
||||
}
|
||||
if raw_track_data and raw_track_data.get('track_number'):
|
||||
match_data['track_number'] = raw_track_data['track_number']
|
||||
if raw_track_data and raw_track_data.get('disc_number'):
|
||||
match_data['disc_number'] = raw_track_data['disc_number']
|
||||
result['spotify_data'] = match_data
|
||||
result['match_data'] = match_data
|
||||
result['status'] = 'Found'
|
||||
result['status_class'] = 'found'
|
||||
result['spotify_track'] = track_obj.name
|
||||
result['spotify_artist'] = ', '.join(track_obj.artists) if isinstance(track_obj.artists, list) else str(track_obj.artists)
|
||||
result['spotify_album'] = album_obj.get('name', '') if isinstance(album_obj, dict) else str(album_obj)
|
||||
result['spotify_id'] = track_obj.id
|
||||
result['confidence'] = match_confidence
|
||||
successful_discoveries += 1
|
||||
state['spotify_matches'] = successful_discoveries
|
||||
|
||||
elif not use_spotify and track_result and isinstance(track_result, dict):
|
||||
match_confidence = track_result.pop('confidence', 0.80)
|
||||
match_data = track_result
|
||||
match_data['source'] = discovery_source
|
||||
_fb_album = match_data.get('album', {})
|
||||
_fb_images = _fb_album.get('images', []) if isinstance(_fb_album, dict) else []
|
||||
if _fb_images and 'image_url' not in match_data:
|
||||
match_data['image_url'] = _fb_images[0].get('url', '')
|
||||
result['spotify_data'] = match_data
|
||||
result['match_data'] = match_data
|
||||
result['status'] = 'Found'
|
||||
result['status_class'] = 'found'
|
||||
result['spotify_track'] = match_data.get('name', '')
|
||||
itunes_artists = match_data.get('artists', [])
|
||||
result['spotify_artist'] = ', '.join(a if isinstance(a, str) else a.get('name', '') for a in itunes_artists) if itunes_artists else ''
|
||||
result['spotify_album'] = match_data.get('album', {}).get('name', '') if isinstance(match_data.get('album'), dict) else match_data.get('album', '')
|
||||
result['spotify_id'] = match_data.get('id', '')
|
||||
result['confidence'] = match_confidence
|
||||
successful_discoveries += 1
|
||||
state['spotify_matches'] = successful_discoveries
|
||||
|
||||
# Save to discovery cache if match found
|
||||
if result['status_class'] == 'found' and result.get('match_data'):
|
||||
try:
|
||||
cache_db = deps.get_database()
|
||||
cache_db.save_discovery_cache_match(
|
||||
cache_key[0], cache_key[1], discovery_source, match_confidence,
|
||||
result['match_data'], track_name,
|
||||
track_artists[0] if track_artists else ''
|
||||
)
|
||||
logger.info(f"CACHE SAVED: {track_name} (confidence: {match_confidence:.3f})")
|
||||
except Exception as cache_err:
|
||||
logger.error(f"Cache save error: {cache_err}")
|
||||
|
||||
# Auto Wing It fallback for unmatched tracks
|
||||
if result['status_class'] == 'not-found':
|
||||
qobuz_t = result.get('qobuz_track', {})
|
||||
stub = deps.build_discovery_wing_it_stub(
|
||||
qobuz_t.get('name', ''),
|
||||
', '.join(qobuz_t.get('artists', [])),
|
||||
qobuz_t.get('duration_ms', 0)
|
||||
)
|
||||
result['status'] = 'Wing It'
|
||||
result['status_class'] = 'wing-it'
|
||||
result['spotify_data'] = stub
|
||||
result['match_data'] = stub
|
||||
result['spotify_track'] = qobuz_t.get('name', '')
|
||||
result['spotify_artist'] = ', '.join(qobuz_t.get('artists', []))
|
||||
result['wing_it_fallback'] = True
|
||||
result['confidence'] = 0
|
||||
successful_discoveries += 1
|
||||
state['spotify_matches'] = successful_discoveries
|
||||
state['wing_it_count'] = state.get('wing_it_count', 0) + 1
|
||||
|
||||
result['index'] = i
|
||||
state['discovery_results'].append(result)
|
||||
state['discovery_progress'] = int(((i + 1) / len(tracks)) * 100)
|
||||
|
||||
time.sleep(0.1)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing track {i+1}: {e}")
|
||||
result = {
|
||||
'qobuz_track': {
|
||||
'name': qobuz_track.get('name', 'Unknown'),
|
||||
'artists': qobuz_track.get('artists', []),
|
||||
},
|
||||
'spotify_data': None,
|
||||
'match_data': None,
|
||||
'status': 'Error',
|
||||
'status_class': 'error',
|
||||
'spotify_track': '',
|
||||
'spotify_artist': '',
|
||||
'spotify_album': '',
|
||||
'error': str(e),
|
||||
'discovery_source': discovery_source,
|
||||
'index': i
|
||||
}
|
||||
state['discovery_results'].append(result)
|
||||
state['discovery_progress'] = int(((i + 1) / len(tracks)) * 100)
|
||||
|
||||
# Mark as complete
|
||||
state['phase'] = 'discovered'
|
||||
state['status'] = 'discovered'
|
||||
state['discovery_progress'] = 100
|
||||
|
||||
source_label = discovery_source.upper()
|
||||
deps.add_activity_item("", f"Qobuz Discovery Complete ({source_label})", f"'{playlist['name']}' - {successful_discoveries}/{len(tracks)} tracks found", "Now")
|
||||
|
||||
logger.info(f"Qobuz discovery complete ({source_label}): {successful_discoveries}/{len(tracks)} tracks found")
|
||||
|
||||
deps.sync_discovery_results_to_mirrored('qobuz', playlist_id, state.get('discovery_results', []), discovery_source, profile_id=state.get('_profile_id', 1))
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in Qobuz discovery worker: {e}")
|
||||
if playlist_id in deps.qobuz_discovery_states:
|
||||
deps.qobuz_discovery_states[playlist_id]['phase'] = 'error'
|
||||
deps.qobuz_discovery_states[playlist_id]['status'] = f'error: {str(e)}'
|
||||
finally:
|
||||
deps.resume_enrichment_workers(_ew_state, 'Qobuz discovery')
|
||||
@ -0,0 +1,402 @@
|
||||
"""Unit tests for QobuzClient playlist + favorites methods.
|
||||
|
||||
Covers the Sync-page parity added for github issue #677:
|
||||
- `get_user_playlists` paginates + normalizes the playlist list
|
||||
- `get_playlist` paginates the tracklist + normalizes track shape
|
||||
- `get_playlist` recognizes the virtual `qobuz-favorites` ID and
|
||||
dispatches to `get_user_favorite_tracks` (same pattern as Tidal's
|
||||
COLLECTION_PLAYLIST_ID)
|
||||
- `get_user_favorite_tracks_count` reads the cheap count-only path
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
import types
|
||||
from typing import Any, Dict, List
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def qobuz_client_module():
|
||||
"""Import core.qobuz_client with config_manager stubbed to a mutable
|
||||
in-memory dict. Snapshots and restores sys.modules entries on
|
||||
teardown so downstream tests still see the real config.
|
||||
"""
|
||||
config_state: Dict[str, Any] = {}
|
||||
|
||||
class _StubConfigManager:
|
||||
def get(self, key, default=None):
|
||||
cur: Any = config_state
|
||||
for part in key.split('.'):
|
||||
if isinstance(cur, dict) and part in cur:
|
||||
cur = cur[part]
|
||||
else:
|
||||
return default
|
||||
return cur
|
||||
|
||||
def set(self, key, value):
|
||||
cur: Any = config_state
|
||||
parts = key.split('.')
|
||||
for part in parts[:-1]:
|
||||
cur = cur.setdefault(part, {})
|
||||
cur[parts[-1]] = value
|
||||
|
||||
original_modules = {
|
||||
name: sys.modules.get(name)
|
||||
for name in ('config', 'config.settings', 'core.qobuz_client')
|
||||
}
|
||||
|
||||
if 'config' not in sys.modules:
|
||||
sys.modules['config'] = types.ModuleType('config')
|
||||
settings_mod = types.ModuleType('config.settings')
|
||||
settings_mod.config_manager = _StubConfigManager()
|
||||
sys.modules['config.settings'] = settings_mod
|
||||
|
||||
sys.modules.pop('core.qobuz_client', None)
|
||||
try:
|
||||
import core.qobuz_client as qobuz_client_module
|
||||
yield qobuz_client_module, config_state
|
||||
finally:
|
||||
for name, original in original_modules.items():
|
||||
if original is None:
|
||||
sys.modules.pop(name, None)
|
||||
else:
|
||||
sys.modules[name] = original
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def authed_client(qobuz_client_module):
|
||||
"""A QobuzClient with stub credentials so is_authenticated() returns True."""
|
||||
module, config = qobuz_client_module
|
||||
config['qobuz'] = {
|
||||
'session': {
|
||||
'app_id': 'APP-1',
|
||||
'app_secret': 'SECRET-1',
|
||||
'user_auth_token': 'TOKEN-1',
|
||||
}
|
||||
}
|
||||
client = module.QobuzClient()
|
||||
client.reload_credentials()
|
||||
assert client.is_authenticated() is True
|
||||
return client
|
||||
|
||||
|
||||
def _install_api_responder(client, responder):
|
||||
"""Replace `_api_request` with a deterministic responder for the test."""
|
||||
client._api_request = responder # type: ignore[method-assign]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# get_user_playlists — pagination + normalization
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_get_user_playlists_returns_normalized_metadata(authed_client):
|
||||
calls: List[Dict[str, Any]] = []
|
||||
|
||||
def responder(endpoint, params=None):
|
||||
calls.append({'endpoint': endpoint, 'params': params})
|
||||
return {
|
||||
'playlists': {
|
||||
'items': [
|
||||
{
|
||||
'id': 1001,
|
||||
'name': 'My Mix',
|
||||
'description': 'on repeat',
|
||||
'is_public': True,
|
||||
'tracks_count': 12,
|
||||
'images': ['https://qobuz.example/cover.jpg'],
|
||||
},
|
||||
],
|
||||
'total': 1,
|
||||
}
|
||||
}
|
||||
|
||||
_install_api_responder(authed_client, responder)
|
||||
playlists = authed_client.get_user_playlists()
|
||||
|
||||
assert calls == [{
|
||||
'endpoint': 'playlist/getUserPlaylists',
|
||||
'params': {'limit': 100, 'offset': 0},
|
||||
}]
|
||||
assert playlists == [{
|
||||
'id': '1001',
|
||||
'name': 'My Mix',
|
||||
'description': 'on repeat',
|
||||
'public': True,
|
||||
'track_count': 12,
|
||||
'image_url': 'https://qobuz.example/cover.jpg',
|
||||
'external_urls': {'qobuz': 'https://play.qobuz.com/playlist/1001'},
|
||||
}]
|
||||
|
||||
|
||||
def test_get_user_playlists_paginates_until_total_reached(authed_client):
|
||||
# Two pages of 100 each, third page returns empty to verify the loop
|
||||
# terminates on `total` rather than waiting for an empty page.
|
||||
page_one = [{'id': i, 'name': f'P{i}', 'tracks_count': 0} for i in range(100)]
|
||||
page_two = [{'id': 100 + i, 'name': f'P{100 + i}', 'tracks_count': 0} for i in range(50)]
|
||||
calls: List[int] = []
|
||||
|
||||
def responder(endpoint, params=None):
|
||||
calls.append(params['offset'])
|
||||
if params['offset'] == 0:
|
||||
return {'playlists': {'items': page_one, 'total': 150}}
|
||||
if params['offset'] == 100:
|
||||
return {'playlists': {'items': page_two, 'total': 150}}
|
||||
return {'playlists': {'items': [], 'total': 150}}
|
||||
|
||||
_install_api_responder(authed_client, responder)
|
||||
playlists = authed_client.get_user_playlists()
|
||||
|
||||
assert len(playlists) == 150
|
||||
assert calls == [0, 100] # No third request needed
|
||||
|
||||
|
||||
def test_get_user_playlists_returns_empty_when_unauthenticated(qobuz_client_module):
|
||||
module, _ = qobuz_client_module
|
||||
client = module.QobuzClient() # no credentials configured
|
||||
assert client.is_authenticated() is False
|
||||
|
||||
def responder(endpoint, params=None):
|
||||
raise AssertionError('should not hit the API when unauthenticated')
|
||||
|
||||
_install_api_responder(client, responder)
|
||||
assert client.get_user_playlists() == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# get_playlist — track pagination + normalization
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_get_playlist_normalizes_tracks(authed_client):
|
||||
def responder(endpoint, params=None):
|
||||
assert endpoint == 'playlist/get'
|
||||
return {
|
||||
'id': 2002,
|
||||
'name': 'Deep Cuts',
|
||||
'description': '',
|
||||
'is_public': False,
|
||||
'tracks_count': 1,
|
||||
'images': ['https://qobuz.example/dc.jpg'],
|
||||
'tracks': {
|
||||
'items': [
|
||||
{
|
||||
'id': 555,
|
||||
'title': 'Forgotten Track',
|
||||
'duration': 240,
|
||||
'parental_warning': True,
|
||||
'performer': {'name': 'Some Artist'},
|
||||
'album': {
|
||||
'title': 'Some Album',
|
||||
'image': {'large': 'https://qobuz.example/art.jpg'},
|
||||
},
|
||||
},
|
||||
],
|
||||
'total': 1,
|
||||
},
|
||||
}
|
||||
|
||||
_install_api_responder(authed_client, responder)
|
||||
playlist = authed_client.get_playlist('2002')
|
||||
|
||||
assert playlist is not None
|
||||
assert playlist['id'] == '2002'
|
||||
assert playlist['name'] == 'Deep Cuts'
|
||||
assert playlist['track_count'] == 1
|
||||
assert playlist['tracks'] == [{
|
||||
'id': '555',
|
||||
'name': 'Forgotten Track',
|
||||
'artists': ['Some Artist'],
|
||||
'album': 'Some Album',
|
||||
'duration_ms': 240_000,
|
||||
'image_url': 'https://qobuz.example/art.jpg',
|
||||
'external_urls': {'qobuz': 'https://play.qobuz.com/track/555'},
|
||||
'explicit': True,
|
||||
}]
|
||||
|
||||
|
||||
def test_get_playlist_routes_favorites_virtual_id(authed_client):
|
||||
"""The virtual `qobuz-favorites` ID must dispatch to the favorites
|
||||
endpoint rather than the playlist/get endpoint — mirrors Tidal's
|
||||
COLLECTION_PLAYLIST_ID pattern."""
|
||||
seen_endpoints: List[str] = []
|
||||
|
||||
def responder(endpoint, params=None):
|
||||
seen_endpoints.append(endpoint)
|
||||
# favorite/getUserFavorites is the only endpoint that should fire
|
||||
return {
|
||||
'tracks': {
|
||||
'items': [
|
||||
{
|
||||
'id': 777,
|
||||
'title': 'Liked Song',
|
||||
'duration': 180,
|
||||
'performer': {'name': 'Loved Artist'},
|
||||
'album': {'title': 'Heart Album', 'image': {'large': 'https://q.example/h.jpg'}},
|
||||
},
|
||||
],
|
||||
'total': 1,
|
||||
}
|
||||
}
|
||||
|
||||
_install_api_responder(authed_client, responder)
|
||||
playlist = authed_client.get_playlist(authed_client.QOBUZ_FAVORITES_ID)
|
||||
|
||||
assert playlist is not None
|
||||
assert playlist['id'] == authed_client.QOBUZ_FAVORITES_ID
|
||||
assert playlist['name'] == authed_client.QOBUZ_FAVORITES_NAME
|
||||
assert playlist['track_count'] == 1
|
||||
assert playlist['tracks'][0]['name'] == 'Liked Song'
|
||||
# Only the favorites endpoint should have been hit — no playlist/get.
|
||||
assert seen_endpoints == ['favorite/getUserFavorites']
|
||||
|
||||
|
||||
def test_get_playlist_paginates_track_list(authed_client):
|
||||
page_one_tracks = [
|
||||
{'id': i, 'title': f'T{i}', 'duration': 100, 'performer': {'name': 'A'}, 'album': {'title': 'Alb', 'image': {}}}
|
||||
for i in range(100)
|
||||
]
|
||||
page_two_tracks = [
|
||||
{'id': 100 + i, 'title': f'T{100 + i}', 'duration': 100, 'performer': {'name': 'A'}, 'album': {'title': 'Alb', 'image': {}}}
|
||||
for i in range(25)
|
||||
]
|
||||
offsets: List[int] = []
|
||||
|
||||
def responder(endpoint, params=None):
|
||||
offsets.append(params['offset'])
|
||||
if params['offset'] == 0:
|
||||
return {
|
||||
'id': 'X', 'name': 'Long', 'description': '', 'is_public': False,
|
||||
'tracks_count': 125, 'images': [],
|
||||
'tracks': {'items': page_one_tracks, 'total': 125},
|
||||
}
|
||||
if params['offset'] == 100:
|
||||
return {
|
||||
'id': 'X', 'name': 'Long', 'description': '', 'is_public': False,
|
||||
'tracks_count': 125, 'images': [],
|
||||
'tracks': {'items': page_two_tracks, 'total': 125},
|
||||
}
|
||||
return {'tracks': {'items': [], 'total': 125}}
|
||||
|
||||
_install_api_responder(authed_client, responder)
|
||||
playlist = authed_client.get_playlist('X')
|
||||
|
||||
assert playlist is not None
|
||||
assert len(playlist['tracks']) == 125
|
||||
assert playlist['track_count'] == 125
|
||||
assert offsets == [0, 100]
|
||||
|
||||
|
||||
def test_get_playlist_returns_none_when_unauthenticated(qobuz_client_module):
|
||||
module, _ = qobuz_client_module
|
||||
client = module.QobuzClient()
|
||||
assert client.get_playlist('whatever') is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# get_user_favorite_tracks + get_user_favorite_tracks_count
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_get_user_favorite_tracks_paginates(authed_client):
|
||||
def make_items(start, count):
|
||||
return [
|
||||
{'id': start + i, 'title': f'F{start + i}', 'duration': 200,
|
||||
'performer': {'name': 'Fav Artist'},
|
||||
'album': {'title': 'Fav Album', 'image': {}}}
|
||||
for i in range(count)
|
||||
]
|
||||
|
||||
offsets: List[int] = []
|
||||
|
||||
def responder(endpoint, params=None):
|
||||
assert endpoint == 'favorite/getUserFavorites'
|
||||
assert params['type'] == 'tracks'
|
||||
offsets.append(params['offset'])
|
||||
if params['offset'] == 0:
|
||||
return {'tracks': {'items': make_items(0, 100), 'total': 130}}
|
||||
if params['offset'] == 100:
|
||||
return {'tracks': {'items': make_items(100, 30), 'total': 130}}
|
||||
return {'tracks': {'items': [], 'total': 130}}
|
||||
|
||||
_install_api_responder(authed_client, responder)
|
||||
tracks = authed_client.get_user_favorite_tracks()
|
||||
|
||||
assert len(tracks) == 130
|
||||
assert offsets == [0, 100]
|
||||
assert tracks[0]['name'] == 'F0'
|
||||
assert tracks[-1]['name'] == 'F129'
|
||||
|
||||
|
||||
def test_get_user_favorite_tracks_count_uses_cheap_call(authed_client):
|
||||
captured: Dict[str, Any] = {}
|
||||
|
||||
def responder(endpoint, params=None):
|
||||
captured['endpoint'] = endpoint
|
||||
captured['params'] = params
|
||||
return {'tracks': {'items': [], 'total': 4242}}
|
||||
|
||||
_install_api_responder(authed_client, responder)
|
||||
count = authed_client.get_user_favorite_tracks_count()
|
||||
|
||||
assert count == 4242
|
||||
# Single request with limit=1 — must not iterate the full list.
|
||||
assert captured == {
|
||||
'endpoint': 'favorite/getUserFavorites',
|
||||
'params': {'type': 'tracks', 'limit': 1, 'offset': 0},
|
||||
}
|
||||
|
||||
|
||||
def test_get_user_favorite_tracks_count_returns_zero_when_unauthenticated(qobuz_client_module):
|
||||
module, _ = qobuz_client_module
|
||||
client = module.QobuzClient()
|
||||
assert client.get_user_favorite_tracks_count() == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Track normalization fallbacks — artist resolution chain
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_track_normalization_falls_back_to_album_artist(authed_client):
|
||||
"""When `performer.name` is missing, album.artist.name should win
|
||||
over the bare 'Unknown Artist' default."""
|
||||
def responder(endpoint, params=None):
|
||||
return {
|
||||
'id': 'P', 'name': 'p', 'description': '', 'is_public': False,
|
||||
'tracks_count': 1, 'images': [],
|
||||
'tracks': {
|
||||
'items': [{
|
||||
'id': 1, 'title': 'X', 'duration': 10,
|
||||
'album': {
|
||||
'title': 'A',
|
||||
'artist': {'name': 'Album Artist'},
|
||||
'image': {'large': ''},
|
||||
},
|
||||
}],
|
||||
'total': 1,
|
||||
}
|
||||
}
|
||||
|
||||
_install_api_responder(authed_client, responder)
|
||||
playlist = authed_client.get_playlist('P')
|
||||
assert playlist['tracks'][0]['artists'] == ['Album Artist']
|
||||
|
||||
|
||||
def test_track_normalization_uses_unknown_artist_when_all_sources_empty(authed_client):
|
||||
def responder(endpoint, params=None):
|
||||
return {
|
||||
'id': 'P', 'name': 'p', 'description': '', 'is_public': False,
|
||||
'tracks_count': 1, 'images': [],
|
||||
'tracks': {
|
||||
'items': [{'id': 1, 'title': 'X', 'duration': 10}],
|
||||
'total': 1,
|
||||
}
|
||||
}
|
||||
|
||||
_install_api_responder(authed_client, responder)
|
||||
playlist = authed_client.get_playlist('P')
|
||||
assert playlist['tracks'][0]['artists'] == ['Unknown Artist']
|
||||
File diff suppressed because it is too large
Load Diff
Loading…
Reference in new issue