mirror of https://github.com/Nezreka/SoulSync.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
506 lines
17 KiB
506 lines
17 KiB
"""Tests for core/search/orchestrator.py — main enhanced-search dispatch + streaming."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
|
|
import pytest
|
|
|
|
from core.search import orchestrator
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fakes
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class _Artist:
|
|
def __init__(self, id_, name, image_url=None, external_urls=None, thumb_url=None):
|
|
self.id = id_
|
|
self.name = name
|
|
self.image_url = image_url
|
|
self.external_urls = external_urls
|
|
self.thumb_url = thumb_url
|
|
|
|
|
|
class _Album:
|
|
def __init__(self, id_, name, artists=None, image_url=None, release_date=None,
|
|
total_tracks=10, album_type='album', external_urls=None):
|
|
self.id = id_
|
|
self.name = name
|
|
self.artists = artists or []
|
|
self.image_url = image_url
|
|
self.release_date = release_date
|
|
self.total_tracks = total_tracks
|
|
self.album_type = album_type
|
|
self.external_urls = external_urls
|
|
|
|
|
|
class _Track:
|
|
def __init__(self, id_, name, artists=None, album=None, duration_ms=180000,
|
|
image_url=None, release_date=None, external_urls=None):
|
|
self.id = id_
|
|
self.name = name
|
|
self.artists = artists or []
|
|
self.album = album
|
|
self.duration_ms = duration_ms
|
|
self.image_url = image_url
|
|
self.release_date = release_date
|
|
self.external_urls = external_urls
|
|
|
|
|
|
class _Client:
|
|
def __init__(self, *, name='fake', artists=None, albums=None, tracks=None,
|
|
fail_search=False, authed=True, connected=True):
|
|
self.name = name
|
|
self._artists = artists or []
|
|
self._albums = albums or []
|
|
self._tracks = tracks or []
|
|
self._fail = fail_search
|
|
self._authed = authed
|
|
self._connected = connected
|
|
|
|
def search_artists(self, q, limit=10):
|
|
if self._fail:
|
|
raise RuntimeError("client search boom")
|
|
return self._artists
|
|
|
|
def search_albums(self, q, limit=10):
|
|
if self._fail:
|
|
raise RuntimeError("client search boom")
|
|
return self._albums
|
|
|
|
def search_tracks(self, q, limit=10):
|
|
if self._fail:
|
|
raise RuntimeError("client search boom")
|
|
return self._tracks
|
|
|
|
def is_spotify_authenticated(self):
|
|
return self._authed
|
|
|
|
def is_connected(self):
|
|
return self._connected
|
|
|
|
|
|
class _DB:
|
|
def __init__(self, artists=None):
|
|
self._artists = artists or []
|
|
|
|
def search_artists(self, q, limit=5, server_source=None):
|
|
return self._artists
|
|
|
|
|
|
class _Cfg:
|
|
def __init__(self, values=None):
|
|
self._v = values or {}
|
|
|
|
def get(self, k, default=None):
|
|
return self._v.get(k, default)
|
|
|
|
def get_active_media_server(self):
|
|
return self._v.get('__active_server', 'plex')
|
|
|
|
|
|
class _Worker:
|
|
def __init__(self):
|
|
self.enqueued = []
|
|
|
|
def enqueue(self, query, kind):
|
|
self.enqueued.append((query, kind))
|
|
|
|
|
|
def _sync_run_async(coro):
|
|
"""Run a coroutine synchronously on a fresh loop."""
|
|
import asyncio
|
|
import inspect
|
|
if not inspect.iscoroutine(coro):
|
|
return coro
|
|
loop = asyncio.new_event_loop()
|
|
try:
|
|
return loop.run_until_complete(coro)
|
|
finally:
|
|
loop.close()
|
|
|
|
|
|
def _build_deps(**overrides):
|
|
"""Default deps for an enhanced-search call. Override with kwargs."""
|
|
base = dict(
|
|
database=_DB(),
|
|
config_manager=_Cfg({'discogs.token': ''}),
|
|
spotify_client=None,
|
|
hydrabase_client=None,
|
|
hydrabase_worker=None,
|
|
download_orchestrator=None,
|
|
fix_artist_image_url=lambda u: f'FIXED::{u}' if u else None,
|
|
is_hydrabase_active=lambda: False,
|
|
get_metadata_fallback_source=lambda: 'spotify',
|
|
get_metadata_fallback_client=lambda: _Client(name='fallback'),
|
|
get_itunes_client=lambda: _Client(name='itunes'),
|
|
get_deezer_client=lambda: _Client(name='deezer'),
|
|
get_discogs_client=lambda token=None: _Client(name='discogs'),
|
|
run_background_comparison=lambda *a, **k: None,
|
|
run_async=_sync_run_async,
|
|
dev_mode_enabled_provider=lambda: False,
|
|
)
|
|
base.update(overrides)
|
|
return orchestrator.SearchDeps(**base)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# resolve_client
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_resolve_spotify_authed_returns_client():
|
|
deps = _build_deps(spotify_client=_Client(authed=True))
|
|
client, ok = orchestrator.resolve_client('spotify', deps)
|
|
assert client is deps.spotify_client
|
|
assert ok is True
|
|
|
|
|
|
def test_resolve_spotify_unauthed_returns_none():
|
|
deps = _build_deps(spotify_client=_Client(authed=False))
|
|
client, ok = orchestrator.resolve_client('spotify', deps)
|
|
assert client is None
|
|
assert ok is False
|
|
|
|
|
|
def test_resolve_spotify_missing_returns_none():
|
|
deps = _build_deps(spotify_client=None)
|
|
client, ok = orchestrator.resolve_client('spotify', deps)
|
|
assert client is None
|
|
assert ok is False
|
|
|
|
|
|
def test_resolve_itunes_always_returns_client():
|
|
deps = _build_deps()
|
|
client, ok = orchestrator.resolve_client('itunes', deps)
|
|
assert client.name == 'itunes'
|
|
assert ok is True
|
|
|
|
|
|
def test_resolve_deezer_always_returns_client():
|
|
deps = _build_deps()
|
|
client, ok = orchestrator.resolve_client('deezer', deps)
|
|
assert client.name == 'deezer'
|
|
assert ok is True
|
|
|
|
|
|
def test_resolve_discogs_with_token_returns_client():
|
|
deps = _build_deps(config_manager=_Cfg({'discogs.token': 'tok'}))
|
|
client, ok = orchestrator.resolve_client('discogs', deps)
|
|
assert client.name == 'discogs'
|
|
assert ok is True
|
|
|
|
|
|
def test_resolve_discogs_without_token_returns_none():
|
|
deps = _build_deps(config_manager=_Cfg({'discogs.token': ''}))
|
|
client, ok = orchestrator.resolve_client('discogs', deps)
|
|
assert client is None
|
|
assert ok is False
|
|
|
|
|
|
def test_resolve_hydrabase_connected_returns_client():
|
|
deps = _build_deps(hydrabase_client=_Client(connected=True))
|
|
client, ok = orchestrator.resolve_client('hydrabase', deps)
|
|
assert client is deps.hydrabase_client
|
|
assert ok is True
|
|
|
|
|
|
def test_resolve_hydrabase_disconnected_returns_none():
|
|
deps = _build_deps(hydrabase_client=_Client(connected=False))
|
|
client, ok = orchestrator.resolve_client('hydrabase', deps)
|
|
assert client is None
|
|
assert ok is False
|
|
|
|
|
|
def test_resolve_unknown_source_returns_none():
|
|
deps = _build_deps()
|
|
client, ok = orchestrator.resolve_client('garbage', deps)
|
|
assert client is None
|
|
assert ok is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# run_enhanced_search — short query path
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_short_query_skips_remote_search():
|
|
db_artist = _Artist('a1', 'Aretha', thumb_url='http://x/a.jpg')
|
|
deps = _build_deps(database=_DB(artists=[db_artist]))
|
|
|
|
result = orchestrator.run_enhanced_search('aa', '', deps)
|
|
assert result['db_artists'][0]['name'] == 'Aretha'
|
|
assert result['spotify_artists'] == []
|
|
assert result['spotify_albums'] == []
|
|
assert result['spotify_tracks'] == []
|
|
assert result['primary_source'] == 'spotify'
|
|
assert result['alternate_sources'] == []
|
|
|
|
|
|
def test_short_query_with_explicit_source_uses_that_source_label():
|
|
deps = _build_deps()
|
|
result = orchestrator.run_enhanced_search('aa', 'deezer', deps)
|
|
assert result['primary_source'] == 'deezer'
|
|
assert result['metadata_source'] == 'deezer'
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# run_enhanced_search — single source
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_single_source_runs_only_that_source():
|
|
spot = _Client(authed=True, artists=[_Artist('s1', 'Spot Artist')])
|
|
deps = _build_deps(spotify_client=spot)
|
|
result = orchestrator.run_enhanced_search('pink floyd', 'spotify', deps)
|
|
|
|
assert result['primary_source'] == 'spotify'
|
|
assert result['metadata_source'] == 'spotify'
|
|
assert result['source_available'] is True
|
|
assert result['spotify_artists'][0]['name'] == 'Spot Artist'
|
|
assert result['alternate_sources'] == []
|
|
|
|
|
|
def test_single_source_unavailable_returns_empty_with_source_available_false():
|
|
deps = _build_deps(spotify_client=None)
|
|
result = orchestrator.run_enhanced_search('pink floyd', 'spotify', deps)
|
|
assert result['source_available'] is False
|
|
assert result['spotify_artists'] == []
|
|
assert result['primary_source'] == 'spotify'
|
|
|
|
|
|
def test_single_source_search_failure_returns_empty():
|
|
spot = _Client(authed=True, fail_search=True)
|
|
deps = _build_deps(spotify_client=spot)
|
|
result = orchestrator.run_enhanced_search('q', 'spotify', deps)
|
|
# search_source still returns a wrapper because per-kind exceptions are
|
|
# swallowed inside it, so we get [] for each kind, source_available=True
|
|
assert result['spotify_artists'] == []
|
|
assert result['spotify_albums'] == []
|
|
assert result['spotify_tracks'] == []
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# run_enhanced_search — fan-out
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_fanout_uses_fallback_client_as_primary():
|
|
fb_client = _Client(artists=[_Artist('f1', 'Fallback Artist')])
|
|
deps = _build_deps(
|
|
get_metadata_fallback_source=lambda: 'deezer',
|
|
get_metadata_fallback_client=lambda: fb_client,
|
|
)
|
|
result = orchestrator.run_enhanced_search('pink floyd', '', deps)
|
|
|
|
assert result['primary_source'] == 'deezer'
|
|
assert result['spotify_artists'][0]['name'] == 'Fallback Artist'
|
|
|
|
|
|
def test_fanout_lists_alternate_sources_excluding_primary():
|
|
deps = _build_deps(
|
|
get_metadata_fallback_source=lambda: 'deezer',
|
|
spotify_client=_Client(authed=True),
|
|
)
|
|
result = orchestrator.run_enhanced_search('pink floyd', '', deps)
|
|
alts = result['alternate_sources']
|
|
assert 'deezer' not in alts # primary excluded
|
|
assert 'itunes' in alts
|
|
assert 'spotify' in alts
|
|
assert 'youtube_videos' in alts
|
|
assert 'musicbrainz' in alts
|
|
|
|
|
|
def test_fanout_omits_spotify_alternate_when_unauthed():
|
|
deps = _build_deps(
|
|
get_metadata_fallback_source=lambda: 'deezer',
|
|
spotify_client=_Client(authed=False),
|
|
)
|
|
result = orchestrator.run_enhanced_search('pink floyd', '', deps)
|
|
assert 'spotify' not in result['alternate_sources']
|
|
|
|
|
|
def test_fanout_omits_discogs_alternate_when_no_token():
|
|
deps = _build_deps(
|
|
get_metadata_fallback_source=lambda: 'deezer',
|
|
config_manager=_Cfg({'discogs.token': ''}),
|
|
)
|
|
result = orchestrator.run_enhanced_search('pink floyd', '', deps)
|
|
assert 'discogs' not in result['alternate_sources']
|
|
|
|
|
|
def test_fanout_includes_discogs_alternate_when_token_set():
|
|
deps = _build_deps(
|
|
get_metadata_fallback_source=lambda: 'deezer',
|
|
config_manager=_Cfg({'discogs.token': 'abc'}),
|
|
)
|
|
result = orchestrator.run_enhanced_search('pink floyd', '', deps)
|
|
assert 'discogs' in result['alternate_sources']
|
|
|
|
|
|
def test_fanout_omits_hydrabase_alternate_when_disconnected():
|
|
deps = _build_deps(
|
|
get_metadata_fallback_source=lambda: 'deezer',
|
|
hydrabase_client=None,
|
|
)
|
|
result = orchestrator.run_enhanced_search('pink floyd', '', deps)
|
|
assert 'hydrabase' not in result['alternate_sources']
|
|
|
|
|
|
def test_fanout_hydrabase_primary_runs_hydrabase_first():
|
|
hydra = _Client(connected=True, artists=[_Artist('h1', 'Hydra Artist')])
|
|
deps = _build_deps(
|
|
is_hydrabase_active=lambda: True,
|
|
hydrabase_client=hydra,
|
|
)
|
|
result = orchestrator.run_enhanced_search('pink floyd', '', deps)
|
|
assert result['primary_source'] == 'hydrabase'
|
|
assert result['spotify_artists'][0]['name'] == 'Hydra Artist'
|
|
|
|
|
|
def test_fanout_hydrabase_failure_falls_through_to_spotify_default():
|
|
hydra_fail = _Client(connected=True, fail_search=True)
|
|
deps = _build_deps(
|
|
is_hydrabase_active=lambda: True,
|
|
hydrabase_client=hydra_fail,
|
|
get_metadata_fallback_source=lambda: 'spotify',
|
|
get_metadata_fallback_client=lambda: _Client(name='spotify-fb'),
|
|
)
|
|
# Should not raise
|
|
result = orchestrator.run_enhanced_search('q', '', deps)
|
|
# search_source still returns a wrapper because per-kind exceptions are
|
|
# swallowed inside it — so primary_results.tracks is []. Code keeps
|
|
# primary_source='hydrabase' because search_source returned a value.
|
|
assert result is not None
|
|
|
|
|
|
def test_fanout_hydrabase_worker_enqueued_when_dev_mode_enabled():
|
|
worker = _Worker()
|
|
deps = _build_deps(
|
|
hydrabase_worker=worker,
|
|
dev_mode_enabled_provider=lambda: True,
|
|
get_metadata_fallback_source=lambda: 'deezer',
|
|
)
|
|
orchestrator.run_enhanced_search('pink floyd', '', deps)
|
|
enqueued_kinds = {kind for _q, kind in worker.enqueued}
|
|
assert enqueued_kinds == {'tracks', 'albums', 'artists'}
|
|
|
|
|
|
def test_fanout_hydrabase_worker_skipped_in_prod_mode():
|
|
worker = _Worker()
|
|
deps = _build_deps(
|
|
hydrabase_worker=worker,
|
|
dev_mode_enabled_provider=lambda: False,
|
|
get_metadata_fallback_source=lambda: 'deezer',
|
|
)
|
|
orchestrator.run_enhanced_search('pink floyd', '', deps)
|
|
assert worker.enqueued == []
|
|
|
|
|
|
def test_fanout_db_artists_get_image_url_fixed():
|
|
db_artist = _Artist('a1', 'Aretha', thumb_url='/library/a.jpg')
|
|
deps = _build_deps(database=_DB(artists=[db_artist]))
|
|
result = orchestrator.run_enhanced_search('pink floyd', '', deps)
|
|
assert result['db_artists'][0]['image_url'] == 'FIXED::/library/a.jpg'
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# empty_response
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_empty_response_keys():
|
|
r = orchestrator.empty_response()
|
|
for k in ('db_artists', 'spotify_artists', 'spotify_albums', 'spotify_tracks',
|
|
'sources', 'primary_source', 'metadata_source'):
|
|
assert k in r
|
|
assert r['primary_source'] == 'spotify'
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Streaming generators + youtube_videos client resolution
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _drain(generator):
|
|
"""Drain an NDJSON generator into a list of parsed JSON dicts."""
|
|
out = []
|
|
for line in generator:
|
|
out.append(json.loads(line.rstrip('\n')))
|
|
return out
|
|
|
|
|
|
def test_stream_metadata_source_yields_three_kinds_plus_done():
|
|
spot = _Client(
|
|
authed=True,
|
|
artists=[_Artist('a', 'A')],
|
|
albums=[_Album('b', 'B')],
|
|
tracks=[_Track('c', 'C')],
|
|
)
|
|
out = _drain(orchestrator.stream_metadata_source('spotify', 'q', spot))
|
|
types = [m['type'] for m in out]
|
|
assert 'artists' in types
|
|
assert 'albums' in types
|
|
assert 'tracks' in types
|
|
assert types[-1] == 'done'
|
|
|
|
|
|
class _FakeYouTubeVideo:
|
|
def __init__(self, vid):
|
|
self.video_id = vid
|
|
self.title = f"Title {vid}"
|
|
self.channel = "Chan"
|
|
self.duration = 100
|
|
self.thumbnail = f"thumb-{vid}.jpg"
|
|
self.url = f"https://yt/{vid}"
|
|
self.view_count = 1000
|
|
self.upload_date = "20260101"
|
|
|
|
|
|
class _FakeYouTube:
|
|
def __init__(self, results=None):
|
|
self._results = results or []
|
|
|
|
async def search_videos(self, q, max_results=20):
|
|
return self._results
|
|
|
|
|
|
class _FakeSoulseekWithYT:
|
|
def __init__(self, youtube):
|
|
self._youtube = youtube
|
|
|
|
def client(self, name):
|
|
return self._youtube if name == 'youtube' else None
|
|
|
|
|
|
def test_resolve_youtube_videos_returns_subclient():
|
|
yt = _FakeYouTube()
|
|
deps = _build_deps(download_orchestrator=_FakeSoulseekWithYT(yt))
|
|
assert orchestrator.resolve_youtube_videos_client(deps) is yt
|
|
|
|
|
|
def test_resolve_youtube_videos_no_soulseek_returns_none():
|
|
deps = _build_deps(download_orchestrator=None)
|
|
assert orchestrator.resolve_youtube_videos_client(deps) is None
|
|
|
|
|
|
def test_resolve_youtube_videos_no_youtube_attr_returns_none():
|
|
class _NoYT:
|
|
pass
|
|
deps = _build_deps(download_orchestrator=_NoYT())
|
|
assert orchestrator.resolve_youtube_videos_client(deps) is None
|
|
|
|
|
|
def test_stream_youtube_videos_yields_videos_chunk_and_done():
|
|
yt = _FakeYouTube(results=[_FakeYouTubeVideo('vid1'), _FakeYouTubeVideo('vid2')])
|
|
out = _drain(orchestrator.stream_youtube_videos('q', yt, _sync_run_async))
|
|
assert out[0]['type'] == 'videos'
|
|
assert len(out[0]['data']) == 2
|
|
assert out[0]['data'][0]['video_id'] == 'vid1'
|
|
assert out[-1]['type'] == 'done'
|
|
|
|
|
|
def test_stream_youtube_videos_search_failure_yields_empty_videos():
|
|
class _BadYT:
|
|
async def search_videos(self, q, max_results=20):
|
|
raise RuntimeError("yt-dlp boom")
|
|
|
|
out = _drain(orchestrator.stream_youtube_videos('q', _BadYT(), _sync_run_async))
|
|
assert out[0] == {'type': 'videos', 'data': []}
|
|
assert out[-1] == {'type': 'done'}
|