PR5c: lift _run_playlist_discovery_worker to core/discovery/playlist.py

Third lift in the PR5 discovery-workers series. Pulls the 323-line
mirrored-playlist discovery worker out of `web_server.py` into its own
focused module under `core/discovery/`. Pure 1:1 lift — wrapper keeps
the original entry-point name so the existing call site
(`_run_playlist_discovery_worker(pls, automation_id=None)` from the
automation engine) continues to work without changes.

What the playlist discovery worker does:

1. Pause enrichment workers (release shared resources).
2. Pre-compute total track count across all playlists for the automation
   progress card.
3. For each playlist:
   - Fast pre-scan separates already-discovered tracks (skipped, unless
     incomplete metadata or a Wing It stub) from undiscovered ones.
   - For each undiscovered track:
     - Cancellation gate via _playlist_discovery_cancelled set.
     - Discovery cache lookup (with artist validation).
     - matching_engine search-query generation, then Spotify (preferred)
       or iTunes (fallback) search + scoring.
     - Extended search fallback (limit=50) if no high-confidence match.
     - On match → enrich album from metadata cache (id, images,
       total_tracks, album_type, release_date, artists, plus track_number
       and disc_number), build matched_data, write to track.extra_data,
       save to discovery cache.
     - On miss → Wing It stub stored as 'wing_it_fallback' provider.
4. After all playlists: emit `discovery_completed` event when at least
   one new track was discovered, mark automation progress 'finished'.
5. On error → automation progress 'error', traceback printed.
6. Finally: resume enrichment workers.

Dependencies injected via `PlaylistDiscoveryDeps` (16 fields) —
spotify_client, matching_engine, automation_engine, the cancellation
set, plus 12 callable helpers (pause/resume enrichment,
get_active_discovery_source, get_metadata_fallback_client/source,
update_automation_progress, get_database, get_discovery_cache_key,
validate_discovery_cache_artist, discovery_score_candidates,
get_metadata_cache, build_discovery_wing_it_stub).

Diff vs original after `deps.X` → global X normalization is **zero
differences** — 323 lines orig = 323 lines lifted, byte-identical body
(including all whitespace, comments, log strings).

Tests: 15 new under tests/discovery/test_discovery_playlist.py covering
empty playlists, no-tracks playlist skip, complete-discovery skip,
incomplete-discovery re-run, Wing It always re-run, unmatched_by_user
respect, cache hit short-circuit, match above threshold (extra_data +
cache save), match below threshold falls to Wing It, iTunes fallback,
neither-provider error path, cancellation, discovery_completed event
emit, no-event on zero-discovered, multi-playlist grand_total
aggregation.

Full suite: 1098 passing (was 1083). Ruff clean.
pull/407/head
Broque Thomas 4 weeks ago
parent f5ccc55bfc
commit bda0500226

@ -0,0 +1,386 @@
"""Background worker for mirrored playlist track discovery.
`run_playlist_discovery_worker(playlists, automation_id, deps)` is the
function the automation engine schedules to enrich undiscovered mirrored
playlist tracks with Spotify (preferred) or iTunes (fallback) metadata:
1. Pause enrichment workers and pre-compute total track count for the
automation progress card.
2. For each playlist:
- Fast pre-scan separates already-discovered tracks (skipped, unless
incomplete metadata or a Wing It stub) from undiscovered ones.
- For each undiscovered track:
- Cancellation gate.
- Discovery cache lookup (with artist validation).
- matching_engine search-query generation, then Spotify/iTunes
search + scoring across queries.
- Extended search fallback (limit=50) if no high-confidence match.
- On match enrich album from metadata cache, build matched_data,
store in track.extra_data, save discovery cache entry.
- On miss Wing It stub stored as 'wing_it_fallback' provider.
3. After all playlists: emit `discovery_completed` event when at least
one new track was discovered, mark automation progress 'finished'.
4. On error automation progress 'error', traceback printed.
5. Finally: resume enrichment workers.
Lifted verbatim from web_server.py. Wide dependency surface (Spotify
and iTunes clients, matching engine, discovery helpers, DB access,
automation engine, cancellation set) all injected via
`PlaylistDiscoveryDeps`.
"""
from __future__ import annotations
import json
import logging
import time
from dataclasses import dataclass
from typing import Any, Callable
logger = logging.getLogger(__name__)
@dataclass
class PlaylistDiscoveryDeps:
"""Bundle of cross-cutting deps the playlist discovery worker needs."""
spotify_client: Any
matching_engine: Any
automation_engine: Any
playlist_discovery_cancelled: set
pause_enrichment_workers: Callable[[str], dict]
resume_enrichment_workers: Callable[[dict, str], None]
get_active_discovery_source: Callable[[], str]
get_metadata_fallback_client: Callable[[], Any]
get_metadata_fallback_source: Callable[[], str]
update_automation_progress: Callable
get_database: Callable[[], Any]
get_discovery_cache_key: Callable
validate_discovery_cache_artist: Callable
discovery_score_candidates: Callable
get_metadata_cache: Callable[[], Any]
build_discovery_wing_it_stub: Callable
def run_playlist_discovery_worker(playlists, automation_id=None, deps: PlaylistDiscoveryDeps = None):
"""Background worker that discovers Spotify/iTunes metadata for undiscovered
mirrored playlist tracks. Stores results in extra_data for use by sync."""
_ew_state = {}
try:
_ew_state = deps.pause_enrichment_workers('mirrored playlist discovery')
discovery_source = deps.get_active_discovery_source()
use_spotify = (discovery_source == 'spotify') and deps.spotify_client and deps.spotify_client.is_spotify_authenticated()
itunes_client_instance = None
if not use_spotify:
try:
itunes_client_instance = deps.get_metadata_fallback_client()
except Exception:
logger.warning(f"Neither Spotify nor {deps.get_metadata_fallback_source()} available for discovery")
deps.update_automation_progress(automation_id, status='error', progress=100,
phase='Error', log_line=f'Neither Spotify nor {deps.get_metadata_fallback_source()} available',
log_type='error')
return
total_discovered = 0
total_failed = 0
total_skipped = 0
total_tracks = 0
last_playlist_name = ''
# Pre-compute grand total for progress tracking
grand_total = 0
db_init = deps.get_database()
for pl in playlists:
t = db_init.get_mirrored_playlist_tracks(pl['id'])
if t:
grand_total += len(t)
deps.update_automation_progress(automation_id, total=grand_total)
for pl in playlists:
pl_id = pl['id']
pl_name = pl.get('name', '')
last_playlist_name = pl_name
source = pl.get('source', '')
db = deps.get_database()
tracks = db.get_mirrored_playlist_tracks(pl_id)
if not tracks:
continue
logger.info(f"Starting discovery for playlist '{pl_name}' ({len(tracks)} tracks, using {discovery_source.upper()})")
deps.update_automation_progress(automation_id, phase=f'Discovering: "{pl_name}"',
log_line=f'Playlist "{pl_name}"{len(tracks)} tracks ({discovery_source.upper()})', log_type='info')
# Fast pre-scan: separate already-discovered from undiscovered
undiscovered_tracks = []
pl_skipped = 0
for track in tracks:
existing_extra = {}
if track.get('extra_data'):
try:
existing_extra = json.loads(track['extra_data']) if isinstance(track['extra_data'], str) else track['extra_data']
except (json.JSONDecodeError, TypeError):
pass
if existing_extra.get('discovered'):
if existing_extra.get('wing_it_fallback'):
# Wing It stub — always re-attempt to find a real match
undiscovered_tracks.append(track)
else:
# Check if matched_data is complete — old discoveries may be missing
# track_number/release_date due to the Track dataclass stripping them.
# Re-discover these so the enriched pipeline fills in the gaps.
md = existing_extra.get('matched_data', {})
album = md.get('album', {})
has_track_num = md.get('track_number')
has_release = album.get('release_date') if isinstance(album, dict) else None
has_album_id = album.get('id') if isinstance(album, dict) else None
if has_track_num and (has_release or has_album_id):
pl_skipped += 1
total_skipped += 1
else:
# Incomplete discovery — re-discover to get full metadata
undiscovered_tracks.append(track)
elif existing_extra.get('unmatched_by_user'):
# User explicitly removed this match — respect their choice
pl_skipped += 1
total_skipped += 1
else:
undiscovered_tracks.append(track)
if pl_skipped > 0:
deps.update_automation_progress(automation_id,
log_line=f'{pl_skipped} tracks already discovered — skipped', log_type='skip')
if not undiscovered_tracks:
deps.update_automation_progress(automation_id,
progress=((total_skipped + total_discovered + total_failed) / max(1, grand_total)) * 100,
log_line=f'All {len(tracks)} tracks already discovered', log_type='skip')
continue
deps.update_automation_progress(automation_id,
log_line=f'{len(undiscovered_tracks)} tracks to discover', log_type='info')
for i, track in enumerate(undiscovered_tracks):
# Check for cancellation
if automation_id and automation_id in deps.playlist_discovery_cancelled:
deps.playlist_discovery_cancelled.discard(automation_id)
logger.warning(f"Playlist discovery cancelled (automation {automation_id})")
deps.update_automation_progress(automation_id, status='finished', progress=100,
phase='Discovery cancelled',
log_line=f'Cancelled: {total_discovered} discovered, {total_failed} failed',
log_type='info')
return
total_tracks += 1
track_id = track['id']
track_name = track.get('track_name', '')
artist_name = track.get('artist_name', '')
duration_ms = track.get('duration_ms', 0)
# Step 1: Check discovery cache
cache_key = deps.get_discovery_cache_key(track_name, artist_name)
try:
cached_match = db.get_discovery_cache_match(cache_key[0], cache_key[1], discovery_source)
if cached_match and deps.validate_discovery_cache_artist(artist_name, cached_match):
extra_data = {
'discovered': True,
'provider': discovery_source,
'confidence': cached_match.get('confidence', 0.85),
'matched_data': cached_match,
}
db.update_mirrored_track_extra_data(track_id, extra_data)
total_discovered += 1
logger.info(f"CACHE [{i+1}/{len(undiscovered_tracks)}]: {track_name}{cached_match.get('name', '?')}")
deps.update_automation_progress(automation_id,
progress=((total_skipped + total_discovered + total_failed) / max(1, grand_total)) * 100,
current_item=track_name,
log_line=f'{track_name}{cached_match.get("name", "?")} (cache)', log_type='success')
continue
except Exception:
pass
# Step 2: Generate search queries
try:
temp_track = type('TempTrack', (), {
'name': track_name,
'artists': [artist_name],
'album': None
})()
search_queries = deps.matching_engine.generate_download_queries(temp_track)
except Exception:
search_queries = [f"{artist_name} {track_name}", track_name]
# Step 3: Search and score
best_match = None
best_confidence = 0.0
min_confidence = 0.7
for search_query in search_queries:
try:
if use_spotify:
results = deps.spotify_client.search_tracks(search_query, limit=10)
else:
results = itunes_client_instance.search_tracks(search_query, limit=10)
if not results:
continue
match, confidence, _ = deps.discovery_score_candidates(
track_name, artist_name, duration_ms, results
)
if match and confidence > best_confidence:
best_confidence = confidence
best_match = match
if best_confidence >= 0.9:
break
except Exception:
continue
# Extended search fallback
if not best_match or best_confidence < min_confidence:
try:
query = f"{artist_name} {track_name}"
if use_spotify:
extended = deps.spotify_client.search_tracks(query, limit=50)
else:
extended = itunes_client_instance.search_tracks(query, limit=50)
if extended:
match, confidence, _ = deps.discovery_score_candidates(
track_name, artist_name, duration_ms, extended
)
if match and confidence > best_confidence:
best_confidence = confidence
best_match = match
except Exception:
pass
# Step 4: Store results
if best_match and best_confidence >= min_confidence:
match_artists = best_match.artists if hasattr(best_match, 'artists') else []
match_image = getattr(best_match, 'image_url', None)
album_name = best_match.album if hasattr(best_match, 'album') else ''
album_obj = {'name': album_name, 'release_date': getattr(best_match, 'release_date', '') or ''}
if match_image:
album_obj['images'] = [{'url': match_image, 'height': 600, 'width': 600}]
# Enrich album data from metadata cache — search_tracks() caches the
# raw API response which has full album info (id, images, total_tracks)
# that the Track dataclass strips to just a name string
track_number = None
disc_number = None
if hasattr(best_match, 'id') and best_match.id:
try:
cache = deps.get_metadata_cache()
_raw = cache.get_entity(discovery_source if not use_spotify else 'spotify', 'track', best_match.id)
if _raw and isinstance(_raw.get('album'), dict):
_raw_album = _raw['album']
if _raw_album.get('id'):
album_obj['id'] = _raw_album['id']
if _raw_album.get('images') and not album_obj.get('images'):
album_obj['images'] = _raw_album['images']
if _raw_album.get('total_tracks'):
album_obj['total_tracks'] = _raw_album['total_tracks']
if _raw_album.get('album_type'):
album_obj['album_type'] = _raw_album['album_type']
if _raw_album.get('release_date') and not album_obj.get('release_date'):
album_obj['release_date'] = _raw_album['release_date']
if _raw_album.get('artists'):
album_obj['artists'] = _raw_album['artists']
if _raw:
track_number = _raw.get('track_number')
disc_number = _raw.get('disc_number')
except Exception:
pass
matched_data = {
'id': best_match.id if hasattr(best_match, 'id') else '',
'name': best_match.name if hasattr(best_match, 'name') else '',
'artists': [{'name': a} if isinstance(a, str) else a for a in match_artists],
'album': album_obj,
'duration_ms': best_match.duration_ms if hasattr(best_match, 'duration_ms') else 0,
'image_url': match_image,
'source': discovery_source,
}
if track_number:
matched_data['track_number'] = track_number
if disc_number:
matched_data['disc_number'] = disc_number
extra_data = {
'discovered': True,
'provider': discovery_source,
'confidence': best_confidence,
'matched_data': matched_data,
}
db.update_mirrored_track_extra_data(track_id, extra_data)
total_discovered += 1
# Save to discovery cache
try:
db.save_discovery_cache_match(
cache_key[0], cache_key[1], discovery_source,
best_confidence, matched_data,
track_name, artist_name
)
except Exception:
pass
logger.info(f"[{i+1}/{len(undiscovered_tracks)}] {track_name}{matched_data['name']} ({best_confidence:.2f})")
deps.update_automation_progress(automation_id,
progress=((total_skipped + total_discovered + total_failed) / max(1, grand_total)) * 100,
processed=total_discovered + total_failed,
current_item=f'{track_name} - {artist_name}',
log_line=f'{track_name}{matched_data["name"]} ({best_confidence:.2f})', log_type='success')
else:
# Auto Wing It fallback — mark as discovered with stub metadata
stub = deps.build_discovery_wing_it_stub(track_name, artist_name, duration_ms)
extra_data = {
'discovered': True,
'provider': 'wing_it_fallback',
'confidence': 0,
'wing_it_fallback': True,
'matched_data': stub,
}
db.update_mirrored_track_extra_data(track_id, extra_data)
total_discovered += 1
logger.info(f"[{i+1}/{len(undiscovered_tracks)}] Wing It: {track_name} by {artist_name}")
deps.update_automation_progress(automation_id,
progress=((total_skipped + total_discovered + total_failed) / max(1, grand_total)) * 100,
processed=total_discovered + total_failed,
current_item=f'{track_name} - {artist_name}',
log_line=f'{track_name} by {artist_name} → wing it (no API match)', log_type='info')
time.sleep(0.15)
# Emit completion event only if new tracks were actually discovered
# (no point triggering downstream sync if nothing changed)
try:
if deps.automation_engine and total_discovered > 0:
_disc_pl_id = str(playlists[0]['id']) if len(playlists) == 1 else ''
deps.automation_engine.emit('discovery_completed', {
'playlist_name': last_playlist_name if len(playlists) == 1 else f'{len(playlists)} playlists',
'playlist_id': _disc_pl_id,
'total_tracks': str(total_tracks),
'discovered_count': str(total_discovered),
'failed_count': str(total_failed),
'skipped_count': str(total_skipped),
})
except Exception:
pass
logger.error(f"Playlist discovery complete: {total_discovered} discovered, {total_failed} failed, {total_skipped} skipped")
deps.update_automation_progress(automation_id, status='finished', progress=100,
phase='Discovery complete',
log_line=f'Done: {total_discovered} discovered, {total_failed} failed, {total_skipped} skipped',
log_type='success')
except Exception as e:
logger.error(f"Error in playlist discovery worker: {e}")
import traceback
traceback.print_exc()
deps.update_automation_progress(automation_id, status='error', progress=100,
phase='Error',
log_line=f'Error: {str(e)}', log_type='error')
finally:
deps.resume_enrichment_workers(_ew_state, 'mirrored playlist discovery')

@ -0,0 +1,410 @@
"""Tests for core/discovery/playlist.py — mirrored playlist discovery worker."""
from __future__ import annotations
import json
from dataclasses import dataclass
import pytest
from core.discovery import playlist as dp
# ---------------------------------------------------------------------------
# Fakes
# ---------------------------------------------------------------------------
@dataclass
class _FakeMatch:
id: str = 'id-1'
name: str = 'Match Name'
artists: list = None
album: str = 'Match Album'
duration_ms: int = 200000
image_url: str = ''
release_date: str = '2024-01-01'
def __post_init__(self):
if self.artists is None:
self.artists = ['Match Artist']
class _FakeSpotifyClient:
def __init__(self, results=None, authenticated=True):
self._results = results if results is not None else []
self._authenticated = authenticated
self.search_calls = []
def is_spotify_authenticated(self):
return self._authenticated
def search_tracks(self, query, limit=10):
self.search_calls.append((query, limit))
return self._results
class _FakeITunesClient:
def __init__(self, results=None):
self._results = results if results is not None else []
self.search_calls = []
def search_tracks(self, query, limit=10):
self.search_calls.append((query, limit))
return self._results
class _FakeMatchingEngine:
def generate_download_queries(self, t):
return [f"{t.artists[0]} {t.name}"]
class _FakeAutomationEngine:
def __init__(self):
self.events = []
def emit(self, event_type, data):
self.events.append((event_type, data))
class _FakeDB:
def __init__(self, tracks_by_playlist=None, cache_match=None):
self._tracks = tracks_by_playlist or {}
self._cache_match = cache_match
self.extra_data_writes = []
self.cache_saves = []
def get_mirrored_playlist_tracks(self, pl_id):
return self._tracks.get(pl_id, [])
def get_discovery_cache_match(self, title, artist, source):
return self._cache_match
def update_mirrored_track_extra_data(self, track_id, extra_data):
self.extra_data_writes.append((track_id, extra_data))
def save_discovery_cache_match(self, title, artist, source, conf, data, raw_t, raw_a):
self.cache_saves.append((title, artist, source, conf))
class _FakeMetadataCache:
def get_entity(self, source, kind, entity_id):
return None
def _build_deps(
*,
spotify_results=None,
spotify_auth=True,
itunes_results=None,
discovery_source='spotify',
cache_match=None,
tracks_by_playlist=None,
cancellation_set=None,
fallback_source='itunes',
score_result=(None, 0.0, 0),
auto_progress_log=None,
activity_log=None,
):
auto_progress_log = auto_progress_log if auto_progress_log is not None else []
db = _FakeDB(tracks_by_playlist=tracks_by_playlist or {}, cache_match=cache_match)
spotify = _FakeSpotifyClient(results=spotify_results or [], authenticated=spotify_auth)
itunes = _FakeITunesClient(results=itunes_results or [])
automation = _FakeAutomationEngine()
deps = dp.PlaylistDiscoveryDeps(
spotify_client=spotify,
matching_engine=_FakeMatchingEngine(),
automation_engine=automation,
playlist_discovery_cancelled=cancellation_set if cancellation_set is not None else set(),
pause_enrichment_workers=lambda label: {'paused': True},
resume_enrichment_workers=lambda state, label: None,
get_active_discovery_source=lambda: discovery_source,
get_metadata_fallback_client=lambda: itunes,
get_metadata_fallback_source=lambda: fallback_source,
update_automation_progress=lambda *a, **kw: auto_progress_log.append((a, kw)),
get_database=lambda: db,
get_discovery_cache_key=lambda title, artist: (title.lower(), artist.lower()),
validate_discovery_cache_artist=lambda artist, m: True,
discovery_score_candidates=lambda *args, **kw: score_result,
get_metadata_cache=lambda: _FakeMetadataCache(),
build_discovery_wing_it_stub=lambda title, artist, dur: {
'name': title, 'artists': [artist], 'duration_ms': dur, 'wing_it': True
},
)
deps._db = db
deps._spotify = spotify
deps._itunes = itunes
deps._auto = automation
deps._auto_progress_log = auto_progress_log
return deps
def _track(track_id=1, name='Track', artist='Artist', duration_ms=180000, extra_data=None):
t = {
'id': track_id,
'track_name': name,
'artist_name': artist,
'duration_ms': duration_ms,
}
if extra_data is not None:
t['extra_data'] = extra_data if isinstance(extra_data, str) else json.dumps(extra_data)
return t
def _playlist(pl_id='p1', name='My Playlist', source='spotify'):
return {'id': pl_id, 'name': name, 'source': source}
# ---------------------------------------------------------------------------
# Empty / no work
# ---------------------------------------------------------------------------
def test_no_playlists_runs_clean():
"""Empty playlists list completes without error."""
deps = _build_deps()
dp.run_playlist_discovery_worker([], automation_id='auto-1', deps=deps)
# automation finished call appended
assert any(kw.get('status') == 'finished' for _, kw in deps._auto_progress_log)
def test_playlist_with_no_tracks_skipped():
"""Playlist with no tracks → continue, no DB writes."""
deps = _build_deps(tracks_by_playlist={'p1': []})
dp.run_playlist_discovery_worker([_playlist('p1')], deps=deps)
assert deps._db.extra_data_writes == []
# ---------------------------------------------------------------------------
# Already-discovered skip logic
# ---------------------------------------------------------------------------
def test_complete_discovery_skipped():
"""Track with discovered=True + complete metadata is skipped."""
extra = {
'discovered': True,
'matched_data': {
'track_number': 5,
'album': {'release_date': '2024-01-01', 'id': 'a1'},
},
}
tracks = [_track(track_id=1, extra_data=extra)]
deps = _build_deps(tracks_by_playlist={'p1': tracks})
dp.run_playlist_discovery_worker([_playlist('p1')], deps=deps)
assert deps._db.extra_data_writes == [] # no re-discovery
def test_incomplete_discovery_redone():
"""discovered=True but missing track_number/release_date → re-discover."""
extra = {
'discovered': True,
'matched_data': {'album': {}}, # missing both track_number AND release_date
}
tracks = [_track(track_id=1, extra_data=extra)]
deps = _build_deps(tracks_by_playlist={'p1': tracks})
dp.run_playlist_discovery_worker([_playlist('p1')], deps=deps)
# Re-discovered as Wing It (no match in score_result default)
assert len(deps._db.extra_data_writes) == 1
def test_wing_it_fallback_always_redone():
"""Wing It stub (wing_it_fallback=True) is re-attempted regardless."""
extra = {'discovered': True, 'wing_it_fallback': True, 'matched_data': {}}
tracks = [_track(track_id=1, extra_data=extra)]
deps = _build_deps(tracks_by_playlist={'p1': tracks})
dp.run_playlist_discovery_worker([_playlist('p1')], deps=deps)
assert len(deps._db.extra_data_writes) == 1
def test_unmatched_by_user_respected():
"""unmatched_by_user=True → respect user's choice, skip."""
extra = {'unmatched_by_user': True}
tracks = [_track(track_id=1, extra_data=extra)]
deps = _build_deps(tracks_by_playlist={'p1': tracks})
dp.run_playlist_discovery_worker([_playlist('p1')], deps=deps)
assert deps._db.extra_data_writes == []
# ---------------------------------------------------------------------------
# Cache hit short-circuit
# ---------------------------------------------------------------------------
def test_cache_hit_short_circuits():
"""Discovery cache hit writes extra_data and skips search."""
cached = {'name': 'Cached Match', 'artists': ['CA'], 'confidence': 0.9}
tracks = [_track(track_id=1)]
deps = _build_deps(tracks_by_playlist={'p1': tracks}, cache_match=cached)
dp.run_playlist_discovery_worker([_playlist('p1')], deps=deps)
assert len(deps._db.extra_data_writes) == 1
track_id, extra = deps._db.extra_data_writes[0]
assert extra['discovered'] is True
assert extra['matched_data'] == cached
assert deps._spotify.search_calls == [] # no live search
# ---------------------------------------------------------------------------
# Live search match
# ---------------------------------------------------------------------------
def test_match_above_threshold_writes_extra_data():
"""High-confidence match writes matched_data + saves to discovery cache."""
match = _FakeMatch()
tracks = [_track(track_id=1)]
deps = _build_deps(
tracks_by_playlist={'p1': tracks},
spotify_results=[match],
score_result=(match, 0.92, 0),
)
dp.run_playlist_discovery_worker([_playlist('p1')], deps=deps)
assert len(deps._db.extra_data_writes) == 1
_, extra = deps._db.extra_data_writes[0]
assert extra['discovered'] is True
assert extra['provider'] == 'spotify'
assert extra['confidence'] == 0.92
assert deps._db.cache_saves # saved to cache
def test_match_below_threshold_falls_back_to_wing_it():
"""No high-confidence match → Wing It stub written."""
match = _FakeMatch()
tracks = [_track(track_id=1)]
deps = _build_deps(
tracks_by_playlist={'p1': tracks},
spotify_results=[match],
score_result=(match, 0.5, 0), # below 0.7 threshold
)
dp.run_playlist_discovery_worker([_playlist('p1')], deps=deps)
assert len(deps._db.extra_data_writes) == 1
_, extra = deps._db.extra_data_writes[0]
assert extra['provider'] == 'wing_it_fallback'
assert extra['wing_it_fallback'] is True
# ---------------------------------------------------------------------------
# iTunes fallback
# ---------------------------------------------------------------------------
def test_itunes_fallback_when_spotify_unauthenticated():
"""spotify unauthenticated → iTunes used."""
match = _FakeMatch()
tracks = [_track(track_id=1)]
deps = _build_deps(
tracks_by_playlist={'p1': tracks},
spotify_auth=False,
discovery_source='itunes',
itunes_results=[match],
score_result=(match, 0.95, 0),
)
dp.run_playlist_discovery_worker([_playlist('p1')], deps=deps)
assert deps._itunes.search_calls
assert deps._spotify.search_calls == []
def test_neither_provider_available_returns_error():
"""Spotify not authenticated AND iTunes raises → automation marked error, return."""
def raising_fallback():
raise RuntimeError("no fallback")
tracks = [_track(track_id=1)]
deps = _build_deps(
tracks_by_playlist={'p1': tracks},
spotify_auth=False,
)
deps.get_metadata_fallback_client = raising_fallback
dp.run_playlist_discovery_worker([_playlist('p1')], automation_id='a1', deps=deps)
# No discovery occurred; automation marked error
assert deps._db.extra_data_writes == []
assert any(kw.get('status') == 'error' for _, kw in deps._auto_progress_log)
# ---------------------------------------------------------------------------
# Cancellation
# ---------------------------------------------------------------------------
def test_cancellation_aborts_loop():
"""automation_id in cancellation set → finish + return."""
tracks = [_track(track_id=1), _track(track_id=2)]
cancel_set = {'auto-stop'}
deps = _build_deps(
tracks_by_playlist={'p1': tracks},
cancellation_set=cancel_set,
)
dp.run_playlist_discovery_worker([_playlist('p1')], automation_id='auto-stop', deps=deps)
# Cancelled before any track processed; cancel_set drained
assert 'auto-stop' not in cancel_set
# ---------------------------------------------------------------------------
# Completion event emission
# ---------------------------------------------------------------------------
def test_discovery_completed_event_emitted():
"""At least one discovered track → automation_engine.emit('discovery_completed')."""
match = _FakeMatch()
tracks = [_track(track_id=1)]
deps = _build_deps(
tracks_by_playlist={'p1': tracks},
spotify_results=[match],
score_result=(match, 0.92, 0),
)
dp.run_playlist_discovery_worker([_playlist('p1')], deps=deps)
events = deps._auto.events
assert any(name == 'discovery_completed' for name, _ in events)
def test_no_event_when_nothing_discovered():
"""Zero discovered → no discovery_completed event."""
extra = {
'discovered': True,
'matched_data': {
'track_number': 5,
'album': {'release_date': '2024-01-01', 'id': 'a1'},
},
}
tracks = [_track(track_id=1, extra_data=extra)]
deps = _build_deps(tracks_by_playlist={'p1': tracks})
dp.run_playlist_discovery_worker([_playlist('p1')], deps=deps)
assert deps._auto.events == []
# ---------------------------------------------------------------------------
# Multi-playlist
# ---------------------------------------------------------------------------
def test_multi_playlist_aggregates_grand_total():
"""Multiple playlists → grand_total counted across all."""
match = _FakeMatch()
tracks_p1 = [_track(track_id=1)]
tracks_p2 = [_track(track_id=2), _track(track_id=3)]
deps = _build_deps(
tracks_by_playlist={'p1': tracks_p1, 'p2': tracks_p2},
spotify_results=[match],
score_result=(match, 0.92, 0),
)
dp.run_playlist_discovery_worker([_playlist('p1'), _playlist('p2')], deps=deps)
# All 3 tracks discovered → 3 extra_data writes
assert len(deps._db.extra_data_writes) == 3

@ -25832,329 +25832,37 @@ def _sync_discovery_results_to_mirrored(source_type, source_playlist_id, discove
traceback.print_exc()
def _run_playlist_discovery_worker(playlists, automation_id=None):
"""Background worker that discovers Spotify/iTunes metadata for undiscovered
mirrored playlist tracks. Stores results in extra_data for use by sync."""
_ew_state = {}
try:
_ew_state = _pause_enrichment_workers('mirrored playlist discovery')
discovery_source = _get_active_discovery_source()
use_spotify = (discovery_source == 'spotify') and spotify_client and spotify_client.is_spotify_authenticated()
# Mirrored-playlist discovery worker logic lives in core/discovery/playlist.py.
from core.discovery import playlist as _discovery_playlist
itunes_client_instance = None
if not use_spotify:
try:
itunes_client_instance = _get_metadata_fallback_client()
except Exception:
logger.warning(f"Neither Spotify nor {_get_metadata_fallback_source()} available for discovery")
_update_automation_progress(automation_id, status='error', progress=100,
phase='Error', log_line=f'Neither Spotify nor {_get_metadata_fallback_source()} available',
log_type='error')
return
total_discovered = 0
total_failed = 0
total_skipped = 0
total_tracks = 0
last_playlist_name = ''
# Pre-compute grand total for progress tracking
grand_total = 0
db_init = get_database()
for pl in playlists:
t = db_init.get_mirrored_playlist_tracks(pl['id'])
if t:
grand_total += len(t)
_update_automation_progress(automation_id, total=grand_total)
for pl in playlists:
pl_id = pl['id']
pl_name = pl.get('name', '')
last_playlist_name = pl_name
source = pl.get('source', '')
db = get_database()
tracks = db.get_mirrored_playlist_tracks(pl_id)
if not tracks:
continue
logger.info(f"Starting discovery for playlist '{pl_name}' ({len(tracks)} tracks, using {discovery_source.upper()})")
_update_automation_progress(automation_id, phase=f'Discovering: "{pl_name}"',
log_line=f'Playlist "{pl_name}"{len(tracks)} tracks ({discovery_source.upper()})', log_type='info')
# Fast pre-scan: separate already-discovered from undiscovered
undiscovered_tracks = []
pl_skipped = 0
for track in tracks:
existing_extra = {}
if track.get('extra_data'):
try:
existing_extra = json.loads(track['extra_data']) if isinstance(track['extra_data'], str) else track['extra_data']
except (json.JSONDecodeError, TypeError):
pass
if existing_extra.get('discovered'):
if existing_extra.get('wing_it_fallback'):
# Wing It stub — always re-attempt to find a real match
undiscovered_tracks.append(track)
else:
# Check if matched_data is complete — old discoveries may be missing
# track_number/release_date due to the Track dataclass stripping them.
# Re-discover these so the enriched pipeline fills in the gaps.
md = existing_extra.get('matched_data', {})
album = md.get('album', {})
has_track_num = md.get('track_number')
has_release = album.get('release_date') if isinstance(album, dict) else None
has_album_id = album.get('id') if isinstance(album, dict) else None
if has_track_num and (has_release or has_album_id):
pl_skipped += 1
total_skipped += 1
else:
# Incomplete discovery — re-discover to get full metadata
undiscovered_tracks.append(track)
elif existing_extra.get('unmatched_by_user'):
# User explicitly removed this match — respect their choice
pl_skipped += 1
total_skipped += 1
else:
undiscovered_tracks.append(track)
if pl_skipped > 0:
_update_automation_progress(automation_id,
log_line=f'{pl_skipped} tracks already discovered — skipped', log_type='skip')
if not undiscovered_tracks:
_update_automation_progress(automation_id,
progress=((total_skipped + total_discovered + total_failed) / max(1, grand_total)) * 100,
log_line=f'All {len(tracks)} tracks already discovered', log_type='skip')
continue
_update_automation_progress(automation_id,
log_line=f'{len(undiscovered_tracks)} tracks to discover', log_type='info')
for i, track in enumerate(undiscovered_tracks):
# Check for cancellation
if automation_id and automation_id in _playlist_discovery_cancelled:
_playlist_discovery_cancelled.discard(automation_id)
logger.warning(f"Playlist discovery cancelled (automation {automation_id})")
_update_automation_progress(automation_id, status='finished', progress=100,
phase='Discovery cancelled',
log_line=f'Cancelled: {total_discovered} discovered, {total_failed} failed',
log_type='info')
return
total_tracks += 1
track_id = track['id']
track_name = track.get('track_name', '')
artist_name = track.get('artist_name', '')
duration_ms = track.get('duration_ms', 0)
# Step 1: Check discovery cache
cache_key = _get_discovery_cache_key(track_name, artist_name)
try:
cached_match = db.get_discovery_cache_match(cache_key[0], cache_key[1], discovery_source)
if cached_match and _validate_discovery_cache_artist(artist_name, cached_match):
extra_data = {
'discovered': True,
'provider': discovery_source,
'confidence': cached_match.get('confidence', 0.85),
'matched_data': cached_match,
}
db.update_mirrored_track_extra_data(track_id, extra_data)
total_discovered += 1
logger.info(f"CACHE [{i+1}/{len(undiscovered_tracks)}]: {track_name}{cached_match.get('name', '?')}")
_update_automation_progress(automation_id,
progress=((total_skipped + total_discovered + total_failed) / max(1, grand_total)) * 100,
current_item=track_name,
log_line=f'{track_name}{cached_match.get("name", "?")} (cache)', log_type='success')
continue
except Exception:
pass
# Step 2: Generate search queries
try:
temp_track = type('TempTrack', (), {
'name': track_name,
'artists': [artist_name],
'album': None
})()
search_queries = matching_engine.generate_download_queries(temp_track)
except Exception:
search_queries = [f"{artist_name} {track_name}", track_name]
# Step 3: Search and score
best_match = None
best_confidence = 0.0
min_confidence = 0.7
for search_query in search_queries:
try:
if use_spotify:
results = spotify_client.search_tracks(search_query, limit=10)
else:
results = itunes_client_instance.search_tracks(search_query, limit=10)
if not results:
continue
match, confidence, _ = _discovery_score_candidates(
track_name, artist_name, duration_ms, results
)
if match and confidence > best_confidence:
best_confidence = confidence
best_match = match
if best_confidence >= 0.9:
break
except Exception:
continue
# Extended search fallback
if not best_match or best_confidence < min_confidence:
try:
query = f"{artist_name} {track_name}"
if use_spotify:
extended = spotify_client.search_tracks(query, limit=50)
else:
extended = itunes_client_instance.search_tracks(query, limit=50)
if extended:
match, confidence, _ = _discovery_score_candidates(
track_name, artist_name, duration_ms, extended
)
if match and confidence > best_confidence:
best_confidence = confidence
best_match = match
except Exception:
pass
# Step 4: Store results
if best_match and best_confidence >= min_confidence:
match_artists = best_match.artists if hasattr(best_match, 'artists') else []
match_image = getattr(best_match, 'image_url', None)
album_name = best_match.album if hasattr(best_match, 'album') else ''
album_obj = {'name': album_name, 'release_date': getattr(best_match, 'release_date', '') or ''}
if match_image:
album_obj['images'] = [{'url': match_image, 'height': 600, 'width': 600}]
# Enrich album data from metadata cache — search_tracks() caches the
# raw API response which has full album info (id, images, total_tracks)
# that the Track dataclass strips to just a name string
track_number = None
disc_number = None
if hasattr(best_match, 'id') and best_match.id:
try:
cache = get_metadata_cache()
_raw = cache.get_entity(discovery_source if not use_spotify else 'spotify', 'track', best_match.id)
if _raw and isinstance(_raw.get('album'), dict):
_raw_album = _raw['album']
if _raw_album.get('id'):
album_obj['id'] = _raw_album['id']
if _raw_album.get('images') and not album_obj.get('images'):
album_obj['images'] = _raw_album['images']
if _raw_album.get('total_tracks'):
album_obj['total_tracks'] = _raw_album['total_tracks']
if _raw_album.get('album_type'):
album_obj['album_type'] = _raw_album['album_type']
if _raw_album.get('release_date') and not album_obj.get('release_date'):
album_obj['release_date'] = _raw_album['release_date']
if _raw_album.get('artists'):
album_obj['artists'] = _raw_album['artists']
if _raw:
track_number = _raw.get('track_number')
disc_number = _raw.get('disc_number')
except Exception:
pass
matched_data = {
'id': best_match.id if hasattr(best_match, 'id') else '',
'name': best_match.name if hasattr(best_match, 'name') else '',
'artists': [{'name': a} if isinstance(a, str) else a for a in match_artists],
'album': album_obj,
'duration_ms': best_match.duration_ms if hasattr(best_match, 'duration_ms') else 0,
'image_url': match_image,
'source': discovery_source,
}
if track_number:
matched_data['track_number'] = track_number
if disc_number:
matched_data['disc_number'] = disc_number
extra_data = {
'discovered': True,
'provider': discovery_source,
'confidence': best_confidence,
'matched_data': matched_data,
}
db.update_mirrored_track_extra_data(track_id, extra_data)
total_discovered += 1
# Save to discovery cache
try:
db.save_discovery_cache_match(
cache_key[0], cache_key[1], discovery_source,
best_confidence, matched_data,
track_name, artist_name
)
except Exception:
pass
logger.info(f"[{i+1}/{len(undiscovered_tracks)}] {track_name}{matched_data['name']} ({best_confidence:.2f})")
_update_automation_progress(automation_id,
progress=((total_skipped + total_discovered + total_failed) / max(1, grand_total)) * 100,
processed=total_discovered + total_failed,
current_item=f'{track_name} - {artist_name}',
log_line=f'{track_name}{matched_data["name"]} ({best_confidence:.2f})', log_type='success')
else:
# Auto Wing It fallback — mark as discovered with stub metadata
stub = _build_discovery_wing_it_stub(track_name, artist_name, duration_ms)
extra_data = {
'discovered': True,
'provider': 'wing_it_fallback',
'confidence': 0,
'wing_it_fallback': True,
'matched_data': stub,
}
db.update_mirrored_track_extra_data(track_id, extra_data)
total_discovered += 1
logger.info(f"[{i+1}/{len(undiscovered_tracks)}] Wing It: {track_name} by {artist_name}")
_update_automation_progress(automation_id,
progress=((total_skipped + total_discovered + total_failed) / max(1, grand_total)) * 100,
processed=total_discovered + total_failed,
current_item=f'{track_name} - {artist_name}',
log_line=f'{track_name} by {artist_name} → wing it (no API match)', log_type='info')
time.sleep(0.15)
def _build_playlist_discovery_deps():
"""Build the PlaylistDiscoveryDeps bundle from web_server.py globals on each call."""
return _discovery_playlist.PlaylistDiscoveryDeps(
spotify_client=spotify_client,
matching_engine=matching_engine,
automation_engine=automation_engine,
playlist_discovery_cancelled=_playlist_discovery_cancelled,
pause_enrichment_workers=_pause_enrichment_workers,
resume_enrichment_workers=_resume_enrichment_workers,
get_active_discovery_source=_get_active_discovery_source,
get_metadata_fallback_client=_get_metadata_fallback_client,
get_metadata_fallback_source=_get_metadata_fallback_source,
update_automation_progress=_update_automation_progress,
get_database=get_database,
get_discovery_cache_key=_get_discovery_cache_key,
validate_discovery_cache_artist=_validate_discovery_cache_artist,
discovery_score_candidates=_discovery_score_candidates,
get_metadata_cache=get_metadata_cache,
build_discovery_wing_it_stub=_build_discovery_wing_it_stub,
)
# Emit completion event only if new tracks were actually discovered
# (no point triggering downstream sync if nothing changed)
try:
if automation_engine and total_discovered > 0:
_disc_pl_id = str(playlists[0]['id']) if len(playlists) == 1 else ''
automation_engine.emit('discovery_completed', {
'playlist_name': last_playlist_name if len(playlists) == 1 else f'{len(playlists)} playlists',
'playlist_id': _disc_pl_id,
'total_tracks': str(total_tracks),
'discovered_count': str(total_discovered),
'failed_count': str(total_failed),
'skipped_count': str(total_skipped),
})
except Exception:
pass
logger.error(f"Playlist discovery complete: {total_discovered} discovered, {total_failed} failed, {total_skipped} skipped")
_update_automation_progress(automation_id, status='finished', progress=100,
phase='Discovery complete',
log_line=f'Done: {total_discovered} discovered, {total_failed} failed, {total_skipped} skipped',
log_type='success')
def _run_playlist_discovery_worker(playlists, automation_id=None):
return _discovery_playlist.run_playlist_discovery_worker(
playlists, automation_id, _build_playlist_discovery_deps()
)
except Exception as e:
logger.error(f"Error in playlist discovery worker: {e}")
import traceback
traceback.print_exc()
_update_automation_progress(automation_id, status='error', progress=100,
phase='Error',
log_line=f'Error: {str(e)}', log_type='error')
finally:
_resume_enrichment_workers(_ew_state, 'mirrored playlist discovery')
def _extract_artist_name(artist):

Loading…
Cancel
Save