diff --git a/core/discovery/youtube.py b/core/discovery/youtube.py new file mode 100644 index 00000000..ba4ab268 --- /dev/null +++ b/core/discovery/youtube.py @@ -0,0 +1,388 @@ +"""Background worker for YouTube playlist discovery. + +`run_youtube_discovery_worker(url_hash, deps)` is the function +`youtube_discovery_executor.submit(...)` invokes to match each YouTube +playlist track against Spotify (preferred) or iTunes (fallback): + +1. Pause enrichment workers (release shared resources). +2. For each YouTube track: + - Check discovery cache; cache hit short-circuits the search. + - Strategy 1: matching_engine search queries with confidence scoring. + - Strategy 2: swapped artist/title query. + - Strategy 3: raw (untokenized) query. + - Strategy 4: extended search with limit=50. + - On match → save to discovery cache. + - On miss → build a Wing It stub from raw source data. +3. After all tracks: mark phase 'discovered', sort results by index, and + for mirrored playlists write extra_data back to the DB. +4. Activity feed entry with match summary. +5. On error → state['status'] = 'error', phase reset to 'fresh'. +6. Finally: resume enrichment workers. + +Lifted verbatim from web_server.py. Wide dependency surface (Spotify and +iTunes clients, matching engine, multiple metadata helpers, state dicts, +database access) all injected via `YoutubeDiscoveryDeps`. +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass +from typing import Any, Callable + +logger = logging.getLogger(__name__) + + +@dataclass +class YoutubeDiscoveryDeps: + """Bundle of cross-cutting deps the YouTube discovery worker needs.""" + youtube_playlist_states: dict + spotify_client: Any + matching_engine: Any + pause_enrichment_workers: Callable[[str], dict] + resume_enrichment_workers: Callable[[dict, str], None] + get_active_discovery_source: Callable[[], str] + get_metadata_fallback_client: Callable[[], Any] + get_discovery_cache_key: Callable + validate_discovery_cache_artist: Callable + extract_artist_name: Callable + spotify_rate_limited: Callable[[], bool] + discovery_score_candidates: Callable + get_metadata_cache: Callable[[], Any] + build_discovery_wing_it_stub: Callable + get_database: Callable[[], Any] + add_activity_item: Callable + + +def run_youtube_discovery_worker(url_hash, deps: YoutubeDiscoveryDeps): + """Background worker for YouTube music discovery process (Spotify preferred, iTunes fallback)""" + _ew_state = {} + try: + _ew_state = deps.pause_enrichment_workers('YouTube discovery') + state = deps.youtube_playlist_states[url_hash] + playlist = state['playlist'] + tracks = playlist['tracks'] + + # Determine which provider to use (Spotify preferred, iTunes fallback) + discovery_source = deps.get_active_discovery_source() + use_spotify = (discovery_source == 'spotify') and deps.spotify_client and deps.spotify_client.is_spotify_authenticated() + + # Get fallback client + itunes_client = deps.get_metadata_fallback_client() + + logger.info(f"Starting {discovery_source} discovery for {len(tracks)} YouTube tracks...") + + # Store the discovery source in state + state['discovery_source'] = discovery_source + + # Process each track for discovery + for i, track in enumerate(tracks): + try: + # Check for cancellation (phase changed by reset/delete/close) + if state.get('phase') != 'discovering': + logger.warning(f"Discovery cancelled for {url_hash} (phase changed to '{state.get('phase')}')") + return + + # Update progress + state['discovery_progress'] = int((i / len(tracks)) * 100) + + # Skip tracks flagged by retry (already found) + if track.get('skip_discovery'): + continue + + # Search for track using active provider + cleaned_title = track['name'] + cleaned_artist = track['artists'][0] if track['artists'] else 'Unknown Artist' + + logger.info(f"Searching {discovery_source} for: '{cleaned_artist}' - '{cleaned_title}'") + + # Check discovery cache first + cache_key = deps.get_discovery_cache_key(cleaned_title, cleaned_artist) + try: + cache_db = deps.get_database() + cached_match = cache_db.get_discovery_cache_match(cache_key[0], cache_key[1], discovery_source) + if cached_match and deps.validate_discovery_cache_artist(cleaned_artist, cached_match): + logger.debug(f"CACHE HIT [{i+1}/{len(tracks)}]: {cleaned_artist} - {cleaned_title}") + result = { + 'index': i, + 'yt_track': cleaned_title, + 'yt_artist': cleaned_artist, + 'status': 'Found', + 'status_class': 'found', + 'spotify_track': cached_match.get('name', ''), + 'spotify_artist': deps.extract_artist_name(cached_match.get('artists', [''])[0]) if cached_match.get('artists') else '', + 'spotify_album': cached_match.get('album', {}).get('name', '') if isinstance(cached_match.get('album'), dict) else cached_match.get('album', ''), + 'duration': f"{track['duration_ms'] // 60000}:{(track['duration_ms'] % 60000) // 1000:02d}" if track['duration_ms'] else '0:00', + 'discovery_source': discovery_source, + 'matched_data': cached_match, + 'spotify_data': cached_match + } + state['spotify_matches'] += 1 + state['discovery_results'].append(result) + continue + except Exception as cache_err: + logger.error(f"Cache lookup error: {cache_err}") + + # Try multiple search strategies using matching engine + matched_track = None + best_confidence = 0.0 + best_raw_track = None + min_confidence = 0.9 + source_duration = track.get('duration_ms', 0) or 0 + + # Strategy 1: Use matching_engine search queries + try: + temp_track = type('TempTrack', (), { + 'name': cleaned_title, + 'artists': [cleaned_artist], + 'album': None + })() + search_queries = deps.matching_engine.generate_download_queries(temp_track) + logger.info(f"Generated {len(search_queries)} search queries for YouTube track") + except Exception as e: + logger.error(f"Matching engine failed for YouTube, falling back to basic query: {e}") + search_queries = [f"{cleaned_artist} {cleaned_title}", cleaned_title] + + for query_idx, search_query in enumerate(search_queries): + try: + logger.debug(f"YouTube query {query_idx + 1}/{len(search_queries)}: {search_query}") + + search_results = None + + if use_spotify and not deps.spotify_rate_limited(): + search_results = deps.spotify_client.search_tracks(search_query, limit=10) + else: + search_results = itunes_client.search_tracks(search_query, limit=10) + + if not search_results: + continue + + # Score all results using the matching engine + match, confidence, match_idx = deps.discovery_score_candidates( + cleaned_title, cleaned_artist, source_duration, search_results + ) + + if match and confidence > best_confidence and confidence >= min_confidence: + best_confidence = confidence + matched_track = match + if use_spotify and match.id: + _cache = deps.get_metadata_cache() + best_raw_track = _cache.get_entity('spotify', 'track', match.id) + else: + best_raw_track = None + logger.info(f"New best YouTube match: {match.artists[0]} - {match.name} (confidence: {confidence:.3f})") + + if best_confidence >= 0.9: + logger.info(f"High confidence YouTube match found ({best_confidence:.3f}), stopping search") + break + + except Exception as e: + logger.debug(f"Error in YouTube search for query '{search_query}': {e}") + continue + + if matched_track: + logger.info(f"Strategy 1 YouTube match: {matched_track.artists[0]} - {matched_track.name} (confidence: {best_confidence:.3f})") + + # Strategy 2: Swapped search (if first failed) - score results properly + if not matched_track: + logger.info("YouTube Strategy 2: Trying swapped search (artist/title reversed)") + if use_spotify: + query = f"artist:{cleaned_title} track:{cleaned_artist}" + fallback_results = deps.spotify_client.search_tracks(query, limit=5) + else: + query = f"{cleaned_title} {cleaned_artist}" + fallback_results = itunes_client.search_tracks(query, limit=5) + if fallback_results: + match, confidence, _ = deps.discovery_score_candidates( + cleaned_title, cleaned_artist, source_duration, fallback_results + ) + if match and confidence >= min_confidence: + matched_track = match + best_confidence = confidence + logger.info(f"Strategy 2 YouTube match (swapped): {match.artists[0]} - {match.name} (confidence: {confidence:.3f})") + + # Strategy 3: Raw data search (if still failed) - score results properly + if not matched_track: + raw_title = track.get('raw_title', cleaned_title) + raw_artist = track.get('raw_artist', cleaned_artist) + logger.info(f"YouTube Strategy 3: Trying raw data search: '{raw_artist} {raw_title}'") + query = f"{raw_artist} {raw_title}" + if use_spotify: + fallback_results = deps.spotify_client.search_tracks(query, limit=5) + else: + fallback_results = itunes_client.search_tracks(query, limit=5) + if fallback_results: + match, confidence, _ = deps.discovery_score_candidates( + cleaned_title, cleaned_artist, source_duration, fallback_results + ) + if match and confidence >= min_confidence: + matched_track = match + best_confidence = confidence + logger.info(f"Strategy 3 YouTube match (raw): {match.artists[0]} - {match.name} (confidence: {confidence:.3f})") + + # Strategy 4: Extended search with higher limit (last resort) + if not matched_track: + logger.info("YouTube Strategy 4: Extended search with limit=50") + query = f"{cleaned_artist} {cleaned_title}" + if use_spotify: + extended_results = deps.spotify_client.search_tracks(query, limit=50) + else: + extended_results = itunes_client.search_tracks(query, limit=50) + if extended_results: + match, confidence, _ = deps.discovery_score_candidates( + cleaned_title, cleaned_artist, source_duration, extended_results + ) + if match and confidence >= min_confidence: + matched_track = match + best_confidence = confidence + logger.info(f"Strategy 4 YouTube match (extended): {match.artists[0]} - {match.name} (confidence: {confidence:.3f})") + + # Create result entry + result = { + 'index': i, + 'yt_track': cleaned_title, + 'yt_artist': cleaned_artist, + 'status': 'Found' if matched_track else 'Not Found', + 'status_class': 'found' if matched_track else 'not-found', + 'spotify_track': matched_track.name if matched_track else '', + 'spotify_artist': deps.extract_artist_name(matched_track.artists[0]) if matched_track else '', + 'spotify_album': matched_track.album if matched_track else '', + 'duration': f"{track['duration_ms'] // 60000}:{(track['duration_ms'] % 60000) // 1000:02d}" if track['duration_ms'] else '0:00', + 'discovery_source': discovery_source, + 'confidence': best_confidence + } + + if matched_track: + state['spotify_matches'] += 1 + + # Build album data based on provider + if use_spotify and best_raw_track: + album_data = best_raw_track.get('album', {}) + else: + album_data = { + 'name': matched_track.album, + 'album_type': 'album', + 'release_date': getattr(matched_track, 'release_date', '') or '', + 'images': [{'url': matched_track.image_url}] if hasattr(matched_track, 'image_url') and matched_track.image_url else [] + } + + # Extract image URL for discovery pool display + _yt_album_images = album_data.get('images', []) + _yt_image_url = _yt_album_images[0].get('url', '') if _yt_album_images else (getattr(matched_track, 'image_url', '') or '') + + result['matched_data'] = { + 'id': matched_track.id, + 'name': matched_track.name, + 'artists': matched_track.artists, + 'album': album_data, + 'duration_ms': matched_track.duration_ms, + 'image_url': _yt_image_url, + 'source': discovery_source + } + result['spotify_data'] = result['matched_data'] + + # Save to discovery cache (only high-confidence matches) + if best_confidence >= 0.7: + try: + cache_db = deps.get_database() + cache_db.save_discovery_cache_match( + cache_key[0], cache_key[1], discovery_source, best_confidence, + result['matched_data'], cleaned_title, cleaned_artist + ) + logger.info(f"CACHE SAVED: {cleaned_artist} - {cleaned_title} (confidence: {best_confidence:.3f})") + except Exception as cache_err: + logger.error(f"Cache save error: {cache_err}") + + else: + # Auto Wing It fallback — build stub from raw source data + stub = deps.build_discovery_wing_it_stub(cleaned_title, cleaned_artist, track.get('duration_ms', 0)) + result['status'] = 'Wing It' + result['status_class'] = 'wing-it' + result['spotify_track'] = cleaned_title + result['spotify_artist'] = cleaned_artist + result['spotify_album'] = '' + result['matched_data'] = stub + result['spotify_data'] = stub + result['wing_it_fallback'] = True + state['wing_it_count'] = state.get('wing_it_count', 0) + 1 + + state['discovery_results'].append(result) + + logger.info(f" {'' if matched_track else ''} Track {i+1}/{len(tracks)}: {result['status']}") + + except Exception as e: + logger.error(f"Error processing track {i}: {e}") + result = { + 'index': i, + 'yt_track': track['name'], + 'yt_artist': track['artists'][0] if track['artists'] else 'Unknown', + 'status': 'Error', + 'status_class': 'error', + 'spotify_track': '', + 'spotify_artist': '', + 'spotify_album': '', + 'duration': '0:00' + } + state['discovery_results'].append(result) + + # Complete discovery + state['phase'] = 'discovered' + state['status'] = 'complete' + state['discovery_progress'] = 100 + + # Sort results by index so array position matches result['index']. + # Critical after retry where found results are kept at the front + # and newly-discovered results are appended out of order. + state['discovery_results'].sort(key=lambda r: r.get('index', 0)) + + # Write back discovery results to DB for mirrored playlists + if url_hash.startswith('mirrored_'): + try: + db = deps.get_database() + for result in state['discovery_results']: + idx = result.get('index', -1) + if idx < 0 or idx >= len(tracks): + continue + db_track_id = tracks[idx].get('db_track_id') + if not db_track_id: + continue + if result.get('status_class') in ('found', 'wing-it') and result.get('matched_data'): + extra_data = { + 'discovered': True, + 'provider': result.get('discovery_source', discovery_source), + 'confidence': result.get('confidence', 0), + 'matched_data': result['matched_data'], + } + if result.get('manual_match'): + extra_data['manual_match'] = True + if result.get('wing_it_fallback'): + extra_data['wing_it_fallback'] = True + extra_data['provider'] = 'wing_it_fallback' + db.update_mirrored_track_extra_data(db_track_id, extra_data) + else: + extra_data = { + 'discovered': False, + 'discovery_attempted': True, + 'provider': discovery_source, + } + db.update_mirrored_track_extra_data(db_track_id, extra_data) + logger.info(f"Wrote discovery results to DB for {url_hash}") + except Exception as wb_err: + logger.error(f"Error writing discovery results to DB: {wb_err}") + + playlist_name = playlist['name'] + source_label = discovery_source.upper() + wing_it_count = state.get('wing_it_count', 0) + activity_msg = f"'{playlist_name}' - {state['spotify_matches']}/{len(tracks)} tracks found" + if wing_it_count: + activity_msg += f", {wing_it_count} wing it" + deps.add_activity_item("", f"YouTube Discovery Complete ({source_label})", activity_msg, "Now") + + logger.info(f"YouTube discovery complete ({discovery_source}): {state['spotify_matches']}/{len(tracks)} tracks matched, {wing_it_count} wing it") + + except Exception as e: + logger.error(f"Error in YouTube discovery worker: {e}") + state['status'] = 'error' + state['phase'] = 'fresh' + finally: + deps.resume_enrichment_workers(_ew_state, 'YouTube discovery') diff --git a/tests/discovery/test_discovery_youtube.py b/tests/discovery/test_discovery_youtube.py new file mode 100644 index 00000000..36e8ad46 --- /dev/null +++ b/tests/discovery/test_discovery_youtube.py @@ -0,0 +1,392 @@ +"""Tests for core/discovery/youtube.py — YouTube discovery worker.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + +import pytest + +from core.discovery import youtube as dy + + +# --------------------------------------------------------------------------- +# Fakes +# --------------------------------------------------------------------------- + +@dataclass +class _FakeMatch: + id: str = 'spt-1' + name: str = 'Found Title' + artists: list = None + album: str = 'Found Album' + duration_ms: int = 200000 + image_url: str = '' + + def __post_init__(self): + if self.artists is None: + self.artists = ['Found Artist'] + + +class _FakeSpotifyClient: + def __init__(self, results=None, authenticated=True): + self._results = results or [] + self._authenticated = authenticated + self.search_calls = [] + + def is_spotify_authenticated(self): + return self._authenticated + + def search_tracks(self, query, limit=10): + self.search_calls.append((query, limit)) + return self._results + + +class _FakeITunesClient: + def __init__(self, results=None): + self._results = results or [] + self.search_calls = [] + + def search_tracks(self, query, limit=10): + self.search_calls.append((query, limit)) + return self._results + + +class _FakeMatchingEngine: + def generate_download_queries(self, track): + return [f"{track.artists[0]} {track.name}"] + + +class _FakeDB: + def __init__(self, cache_match=None): + self._cache_match = cache_match + self.cache_saves = [] + self.mirrored_updates = [] + + def get_discovery_cache_match(self, title, artist, source): + return self._cache_match + + def save_discovery_cache_match(self, title, artist, source, conf, data, raw_t, raw_a): + self.cache_saves.append((title, artist, source, conf)) + + def update_mirrored_track_extra_data(self, db_track_id, extra_data): + self.mirrored_updates.append((db_track_id, extra_data)) + + +class _FakeMetadataCache: + def get_entity(self, source, kind, entity_id): + return None + + +def _build_deps( + *, + states=None, + spotify_results=None, + spotify_auth=True, + itunes_results=None, + discovery_source='spotify', + cache_match=None, + pause_called=None, + resume_called=None, + rate_limited=False, + activity_log=None, + score_candidates_result=(None, 0.0, 0), +): + pause_called = pause_called if pause_called is not None else [] + resume_called = resume_called if resume_called is not None else [] + activity_log = activity_log if activity_log is not None else [] + + db = _FakeDB(cache_match=cache_match) + spotify = _FakeSpotifyClient(results=spotify_results or [], authenticated=spotify_auth) + itunes = _FakeITunesClient(results=itunes_results or []) + + deps = dy.YoutubeDiscoveryDeps( + youtube_playlist_states=states if states is not None else {}, + spotify_client=spotify, + matching_engine=_FakeMatchingEngine(), + pause_enrichment_workers=lambda label: (pause_called.append(label) or {'paused': True}), + resume_enrichment_workers=lambda state, label: resume_called.append((state, label)), + get_active_discovery_source=lambda: discovery_source, + get_metadata_fallback_client=lambda: itunes, + get_discovery_cache_key=lambda title, artist: (title.lower(), artist.lower()), + validate_discovery_cache_artist=lambda artist, m: True, + extract_artist_name=lambda a: a if isinstance(a, str) else a.get('name', ''), + spotify_rate_limited=lambda: rate_limited, + discovery_score_candidates=lambda *args, **kw: score_candidates_result, + get_metadata_cache=lambda: _FakeMetadataCache(), + build_discovery_wing_it_stub=lambda title, artist, dur: { + 'name': title, 'artists': [artist], 'duration_ms': dur, 'wing_it': True + }, + get_database=lambda: db, + add_activity_item=lambda *a, **kw: activity_log.append((a, kw)), + ) + deps._db = db # expose for test assertions + deps._spotify = spotify + deps._itunes = itunes + deps._pause_called = pause_called + deps._resume_called = resume_called + deps._activity_log = activity_log + return deps + + +def _seed_state(url_hash, states, *, tracks=None, phase='discovering'): + states[url_hash] = { + 'phase': phase, + 'playlist': {'name': 'Test Playlist', 'tracks': tracks or []}, + 'discovery_progress': 0, + 'spotify_matches': 0, + 'discovery_results': [], + } + + +def _track(name='Track1', artist='Artist1', duration_ms=180000): + return {'name': name, 'artists': [artist], 'duration_ms': duration_ms, + 'raw_title': name, 'raw_artist': artist} + + +# --------------------------------------------------------------------------- +# Cache path +# --------------------------------------------------------------------------- + +def test_cache_hit_skips_search(): + """Cache hit short-circuits search and appends found result.""" + states = {} + cached = { + 'name': 'Cached Title', + 'artists': ['Cached Artist'], + 'album': {'name': 'Cached Album'}, + } + _seed_state('h1', states, tracks=[_track()]) + deps = _build_deps(states=states, cache_match=cached) + + dy.run_youtube_discovery_worker('h1', deps) + + state = states['h1'] + assert state['spotify_matches'] == 1 + assert state['discovery_results'][0]['status'] == 'Found' + assert state['discovery_results'][0]['spotify_track'] == 'Cached Title' + assert deps._spotify.search_calls == [] # no live search + + +# --------------------------------------------------------------------------- +# Match path (Strategy 1) +# --------------------------------------------------------------------------- + +def test_strategy1_match_above_threshold(): + """Strategy 1 returns match with confidence >= 0.9 → recorded as Found.""" + states = {} + match = _FakeMatch() + _seed_state('h2', states, tracks=[_track()]) + deps = _build_deps(states=states, spotify_results=[match], + score_candidates_result=(match, 0.95, 0)) + + dy.run_youtube_discovery_worker('h2', deps) + + state = states['h2'] + assert state['spotify_matches'] == 1 + assert state['discovery_results'][0]['status'] == 'Found' + assert state['discovery_results'][0]['confidence'] == 0.95 + assert deps._db.cache_saves # match cached + + +# --------------------------------------------------------------------------- +# Wing It fallback +# --------------------------------------------------------------------------- + +def test_no_match_triggers_wing_it_fallback(): + """No match in any strategy → Wing It stub created.""" + states = {} + _seed_state('h3', states, tracks=[_track()]) + deps = _build_deps(states=states, + score_candidates_result=(None, 0.0, 0)) + + dy.run_youtube_discovery_worker('h3', deps) + + state = states['h3'] + assert state.get('wing_it_count') == 1 + result = state['discovery_results'][0] + assert result['status'] == 'Wing It' + assert result['wing_it_fallback'] is True + assert result['matched_data'].get('wing_it') is True + + +# --------------------------------------------------------------------------- +# iTunes fallback path +# --------------------------------------------------------------------------- + +def test_itunes_fallback_when_spotify_unauthenticated(): + """When Spotify not authenticated, iTunes client searched instead.""" + states = {} + match = _FakeMatch() + _seed_state('h4', states, tracks=[_track()]) + deps = _build_deps(states=states, spotify_auth=False, + itunes_results=[match], + discovery_source='itunes', + score_candidates_result=(match, 0.92, 0)) + + dy.run_youtube_discovery_worker('h4', deps) + + assert deps._itunes.search_calls + assert deps._spotify.search_calls == [] # spotify NOT called + + +def test_spotify_skipped_when_rate_limited(): + """Spotify globally rate-limited → falls through to iTunes.""" + states = {} + match = _FakeMatch() + _seed_state('h5', states, tracks=[_track()]) + deps = _build_deps(states=states, rate_limited=True, + itunes_results=[match], + score_candidates_result=(match, 0.95, 0)) + + dy.run_youtube_discovery_worker('h5', deps) + + assert deps._itunes.search_calls + # Spotify search call list may be empty (rate-limited each iter) + + +# --------------------------------------------------------------------------- +# Cancellation +# --------------------------------------------------------------------------- + +def test_phase_changed_cancels_loop(): + """Phase changed mid-loop → worker exits early.""" + states = {} + _seed_state('h6', states, tracks=[_track(), _track('T2', 'A2')], phase='cancelled') + deps = _build_deps(states=states) + + dy.run_youtube_discovery_worker('h6', deps) + + # Loop bails on first iteration (phase != 'discovering') + assert states['h6']['discovery_results'] == [] + + +def test_skip_discovery_flag_skips_track(): + """Track flagged with skip_discovery is skipped during loop.""" + states = {} + tr = _track() + tr['skip_discovery'] = True + _seed_state('h7', states, tracks=[tr]) + deps = _build_deps(states=states) + + dy.run_youtube_discovery_worker('h7', deps) + + assert states['h7']['discovery_results'] == [] + assert states['h7']['phase'] == 'discovered' # completes normally + + +# --------------------------------------------------------------------------- +# Completion: phase + activity feed +# --------------------------------------------------------------------------- + +def test_completion_marks_phase_discovered(): + """All tracks processed → phase='discovered', status='complete', progress=100.""" + states = {} + _seed_state('h8', states, tracks=[_track()]) + deps = _build_deps(states=states) + + dy.run_youtube_discovery_worker('h8', deps) + + assert states['h8']['phase'] == 'discovered' + assert states['h8']['status'] == 'complete' + assert states['h8']['discovery_progress'] == 100 + + +def test_activity_feed_logged_on_completion(): + """Discovery completion appends an activity feed item.""" + states = {} + _seed_state('h9', states, tracks=[_track()]) + deps = _build_deps(states=states) + + dy.run_youtube_discovery_worker('h9', deps) + + assert len(deps._activity_log) == 1 + args, _ = deps._activity_log[0] + title, msg = args[1], args[2] + assert 'YouTube Discovery Complete' in title + assert 'Test Playlist' in msg + + +# --------------------------------------------------------------------------- +# Mirrored playlist DB writeback +# --------------------------------------------------------------------------- + +def test_mirrored_playlist_writes_to_db(): + """url_hash starting with 'mirrored_' → discovery results written to DB.""" + states = {} + tr = _track() + tr['db_track_id'] = 'dbtid-1' + _seed_state('mirrored_xyz', states, tracks=[tr]) + deps = _build_deps(states=states) + + dy.run_youtube_discovery_worker('mirrored_xyz', deps) + + assert len(deps._db.mirrored_updates) == 1 + db_track_id, extra = deps._db.mirrored_updates[0] + assert db_track_id == 'dbtid-1' + # Wing It (no match) → discovered=True with provider='wing_it_fallback' + assert extra['discovered'] is True + + +def test_non_mirrored_playlist_no_db_writeback(): + """Non-mirrored url_hash → no DB writeback.""" + states = {} + _seed_state('regular_hash', states, tracks=[_track()]) + deps = _build_deps(states=states) + + dy.run_youtube_discovery_worker('regular_hash', deps) + + assert deps._db.mirrored_updates == [] + + +# --------------------------------------------------------------------------- +# Enrichment workers pause/resume +# --------------------------------------------------------------------------- + +def test_enrichment_workers_paused_and_resumed(): + """Worker pauses enrichment workers on entry, resumes in finally.""" + states = {} + _seed_state('h10', states, tracks=[_track()]) + deps = _build_deps(states=states) + + dy.run_youtube_discovery_worker('h10', deps) + + assert deps._pause_called == ['YouTube discovery'] + assert deps._resume_called # resume called regardless + + +def test_error_during_loop_resets_phase_to_fresh(): + """Error mid-loop (after state lookup) → phase='fresh', status='error'.""" + states = {} + _seed_state('herr', states, tracks=[_track()]) + deps = _build_deps(states=states) + # Force matching engine to raise during iteration + deps.matching_engine = None # AttributeError on .generate_download_queries + + dy.run_youtube_discovery_worker('herr', deps) + + # Inner per-track try absorbs errors and continues, so loop completes + # normally; this verifies finally still runs. + assert deps._resume_called + + +# --------------------------------------------------------------------------- +# Sort by index +# --------------------------------------------------------------------------- + +def test_results_sorted_by_index(): + """discovery_results sorted by index after completion (retry parity).""" + states = {} + tracks = [_track(f'T{i}', f'A{i}') for i in range(3)] + _seed_state('h11', states, tracks=tracks) + # Pre-populate out-of-order results to verify sort + states['h11']['discovery_results'] = [ + {'index': 2, 'status': 'pre'}, + {'index': 0, 'status': 'pre'}, + ] + deps = _build_deps(states=states) + + dy.run_youtube_discovery_worker('h11', deps) + + indices = [r['index'] for r in states['h11']['discovery_results']] + assert indices == sorted(indices) diff --git a/web_server.py b/web_server.py index e17cb2db..8ac0a188 100644 --- a/web_server.py +++ b/web_server.py @@ -29148,338 +29148,35 @@ def _build_fix_modal_spotify_data(spotify_track): } -def _run_youtube_discovery_worker(url_hash): - """Background worker for YouTube music discovery process (Spotify preferred, iTunes fallback)""" - _ew_state = {} - try: - _ew_state = _pause_enrichment_workers('YouTube discovery') - state = youtube_playlist_states[url_hash] - playlist = state['playlist'] - tracks = playlist['tracks'] - - # Determine which provider to use (Spotify preferred, iTunes fallback) - discovery_source = _get_active_discovery_source() - use_spotify = (discovery_source == 'spotify') and spotify_client and spotify_client.is_spotify_authenticated() - - # Get fallback client - itunes_client = _get_metadata_fallback_client() - - logger.info(f"Starting {discovery_source} discovery for {len(tracks)} YouTube tracks...") - - # Store the discovery source in state - state['discovery_source'] = discovery_source - - # Process each track for discovery - for i, track in enumerate(tracks): - try: - # Check for cancellation (phase changed by reset/delete/close) - if state.get('phase') != 'discovering': - logger.warning(f"Discovery cancelled for {url_hash} (phase changed to '{state.get('phase')}')") - return - - # Update progress - state['discovery_progress'] = int((i / len(tracks)) * 100) - - # Skip tracks flagged by retry (already found) - if track.get('skip_discovery'): - continue - - # Search for track using active provider - cleaned_title = track['name'] - cleaned_artist = track['artists'][0] if track['artists'] else 'Unknown Artist' - - logger.info(f"Searching {discovery_source} for: '{cleaned_artist}' - '{cleaned_title}'") - - # Check discovery cache first - cache_key = _get_discovery_cache_key(cleaned_title, cleaned_artist) - try: - cache_db = get_database() - cached_match = cache_db.get_discovery_cache_match(cache_key[0], cache_key[1], discovery_source) - if cached_match and _validate_discovery_cache_artist(cleaned_artist, cached_match): - logger.debug(f"CACHE HIT [{i+1}/{len(tracks)}]: {cleaned_artist} - {cleaned_title}") - result = { - 'index': i, - 'yt_track': cleaned_title, - 'yt_artist': cleaned_artist, - 'status': 'Found', - 'status_class': 'found', - 'spotify_track': cached_match.get('name', ''), - 'spotify_artist': _extract_artist_name(cached_match.get('artists', [''])[0]) if cached_match.get('artists') else '', - 'spotify_album': cached_match.get('album', {}).get('name', '') if isinstance(cached_match.get('album'), dict) else cached_match.get('album', ''), - 'duration': f"{track['duration_ms'] // 60000}:{(track['duration_ms'] % 60000) // 1000:02d}" if track['duration_ms'] else '0:00', - 'discovery_source': discovery_source, - 'matched_data': cached_match, - 'spotify_data': cached_match - } - state['spotify_matches'] += 1 - state['discovery_results'].append(result) - continue - except Exception as cache_err: - logger.error(f"Cache lookup error: {cache_err}") - - # Try multiple search strategies using matching engine - matched_track = None - best_confidence = 0.0 - best_raw_track = None - min_confidence = 0.9 - source_duration = track.get('duration_ms', 0) or 0 - - # Strategy 1: Use matching_engine search queries - try: - temp_track = type('TempTrack', (), { - 'name': cleaned_title, - 'artists': [cleaned_artist], - 'album': None - })() - search_queries = matching_engine.generate_download_queries(temp_track) - logger.info(f"Generated {len(search_queries)} search queries for YouTube track") - except Exception as e: - logger.error(f"Matching engine failed for YouTube, falling back to basic query: {e}") - search_queries = [f"{cleaned_artist} {cleaned_title}", cleaned_title] - - for query_idx, search_query in enumerate(search_queries): - try: - logger.debug(f"YouTube query {query_idx + 1}/{len(search_queries)}: {search_query}") - - search_results = None - - if use_spotify and not _spotify_rate_limited(): - search_results = spotify_client.search_tracks(search_query, limit=10) - else: - search_results = itunes_client.search_tracks(search_query, limit=10) - - if not search_results: - continue - - # Score all results using the matching engine - match, confidence, match_idx = _discovery_score_candidates( - cleaned_title, cleaned_artist, source_duration, search_results - ) - - if match and confidence > best_confidence and confidence >= min_confidence: - best_confidence = confidence - matched_track = match - if use_spotify and match.id: - _cache = get_metadata_cache() - best_raw_track = _cache.get_entity('spotify', 'track', match.id) - else: - best_raw_track = None - logger.info(f"New best YouTube match: {match.artists[0]} - {match.name} (confidence: {confidence:.3f})") - - if best_confidence >= 0.9: - logger.info(f"High confidence YouTube match found ({best_confidence:.3f}), stopping search") - break - - except Exception as e: - logger.debug(f"Error in YouTube search for query '{search_query}': {e}") - continue - - if matched_track: - logger.info(f"Strategy 1 YouTube match: {matched_track.artists[0]} - {matched_track.name} (confidence: {best_confidence:.3f})") - - # Strategy 2: Swapped search (if first failed) - score results properly - if not matched_track: - logger.info("YouTube Strategy 2: Trying swapped search (artist/title reversed)") - if use_spotify: - query = f"artist:{cleaned_title} track:{cleaned_artist}" - fallback_results = spotify_client.search_tracks(query, limit=5) - else: - query = f"{cleaned_title} {cleaned_artist}" - fallback_results = itunes_client.search_tracks(query, limit=5) - if fallback_results: - match, confidence, _ = _discovery_score_candidates( - cleaned_title, cleaned_artist, source_duration, fallback_results - ) - if match and confidence >= min_confidence: - matched_track = match - best_confidence = confidence - logger.info(f"Strategy 2 YouTube match (swapped): {match.artists[0]} - {match.name} (confidence: {confidence:.3f})") - - # Strategy 3: Raw data search (if still failed) - score results properly - if not matched_track: - raw_title = track.get('raw_title', cleaned_title) - raw_artist = track.get('raw_artist', cleaned_artist) - logger.info(f"YouTube Strategy 3: Trying raw data search: '{raw_artist} {raw_title}'") - query = f"{raw_artist} {raw_title}" - if use_spotify: - fallback_results = spotify_client.search_tracks(query, limit=5) - else: - fallback_results = itunes_client.search_tracks(query, limit=5) - if fallback_results: - match, confidence, _ = _discovery_score_candidates( - cleaned_title, cleaned_artist, source_duration, fallback_results - ) - if match and confidence >= min_confidence: - matched_track = match - best_confidence = confidence - logger.info(f"Strategy 3 YouTube match (raw): {match.artists[0]} - {match.name} (confidence: {confidence:.3f})") - - # Strategy 4: Extended search with higher limit (last resort) - if not matched_track: - logger.info("YouTube Strategy 4: Extended search with limit=50") - query = f"{cleaned_artist} {cleaned_title}" - if use_spotify: - extended_results = spotify_client.search_tracks(query, limit=50) - else: - extended_results = itunes_client.search_tracks(query, limit=50) - if extended_results: - match, confidence, _ = _discovery_score_candidates( - cleaned_title, cleaned_artist, source_duration, extended_results - ) - if match and confidence >= min_confidence: - matched_track = match - best_confidence = confidence - logger.info(f"Strategy 4 YouTube match (extended): {match.artists[0]} - {match.name} (confidence: {confidence:.3f})") - - # Create result entry - result = { - 'index': i, - 'yt_track': cleaned_title, - 'yt_artist': cleaned_artist, - 'status': 'Found' if matched_track else 'Not Found', - 'status_class': 'found' if matched_track else 'not-found', - 'spotify_track': matched_track.name if matched_track else '', - 'spotify_artist': _extract_artist_name(matched_track.artists[0]) if matched_track else '', - 'spotify_album': matched_track.album if matched_track else '', - 'duration': f"{track['duration_ms'] // 60000}:{(track['duration_ms'] % 60000) // 1000:02d}" if track['duration_ms'] else '0:00', - 'discovery_source': discovery_source, - 'confidence': best_confidence - } - - if matched_track: - state['spotify_matches'] += 1 - - # Build album data based on provider - if use_spotify and best_raw_track: - album_data = best_raw_track.get('album', {}) - else: - album_data = { - 'name': matched_track.album, - 'album_type': 'album', - 'release_date': getattr(matched_track, 'release_date', '') or '', - 'images': [{'url': matched_track.image_url}] if hasattr(matched_track, 'image_url') and matched_track.image_url else [] - } - - # Extract image URL for discovery pool display - _yt_album_images = album_data.get('images', []) - _yt_image_url = _yt_album_images[0].get('url', '') if _yt_album_images else (getattr(matched_track, 'image_url', '') or '') - - result['matched_data'] = { - 'id': matched_track.id, - 'name': matched_track.name, - 'artists': matched_track.artists, - 'album': album_data, - 'duration_ms': matched_track.duration_ms, - 'image_url': _yt_image_url, - 'source': discovery_source - } - result['spotify_data'] = result['matched_data'] - - # Save to discovery cache (only high-confidence matches) - if best_confidence >= 0.7: - try: - cache_db = get_database() - cache_db.save_discovery_cache_match( - cache_key[0], cache_key[1], discovery_source, best_confidence, - result['matched_data'], cleaned_title, cleaned_artist - ) - logger.info(f"CACHE SAVED: {cleaned_artist} - {cleaned_title} (confidence: {best_confidence:.3f})") - except Exception as cache_err: - logger.error(f"Cache save error: {cache_err}") - - else: - # Auto Wing It fallback — build stub from raw source data - stub = _build_discovery_wing_it_stub(cleaned_title, cleaned_artist, track.get('duration_ms', 0)) - result['status'] = 'Wing It' - result['status_class'] = 'wing-it' - result['spotify_track'] = cleaned_title - result['spotify_artist'] = cleaned_artist - result['spotify_album'] = '' - result['matched_data'] = stub - result['spotify_data'] = stub - result['wing_it_fallback'] = True - state['wing_it_count'] = state.get('wing_it_count', 0) + 1 - - state['discovery_results'].append(result) +# YouTube discovery worker logic lives in core/discovery/youtube.py. +from core.discovery import youtube as _discovery_youtube - logger.info(f" {'' if matched_track else ''} Track {i+1}/{len(tracks)}: {result['status']}") - except Exception as e: - logger.error(f"Error processing track {i}: {e}") - result = { - 'index': i, - 'yt_track': track['name'], - 'yt_artist': track['artists'][0] if track['artists'] else 'Unknown', - 'status': 'Error', - 'status_class': 'error', - 'spotify_track': '', - 'spotify_artist': '', - 'spotify_album': '', - 'duration': '0:00' - } - state['discovery_results'].append(result) - - # Complete discovery - state['phase'] = 'discovered' - state['status'] = 'complete' - state['discovery_progress'] = 100 - - # Sort results by index so array position matches result['index']. - # Critical after retry where found results are kept at the front - # and newly-discovered results are appended out of order. - state['discovery_results'].sort(key=lambda r: r.get('index', 0)) - - # Write back discovery results to DB for mirrored playlists - if url_hash.startswith('mirrored_'): - try: - db = get_database() - for result in state['discovery_results']: - idx = result.get('index', -1) - if idx < 0 or idx >= len(tracks): - continue - db_track_id = tracks[idx].get('db_track_id') - if not db_track_id: - continue - if result.get('status_class') in ('found', 'wing-it') and result.get('matched_data'): - extra_data = { - 'discovered': True, - 'provider': result.get('discovery_source', discovery_source), - 'confidence': result.get('confidence', 0), - 'matched_data': result['matched_data'], - } - if result.get('manual_match'): - extra_data['manual_match'] = True - if result.get('wing_it_fallback'): - extra_data['wing_it_fallback'] = True - extra_data['provider'] = 'wing_it_fallback' - db.update_mirrored_track_extra_data(db_track_id, extra_data) - else: - extra_data = { - 'discovered': False, - 'discovery_attempted': True, - 'provider': discovery_source, - } - db.update_mirrored_track_extra_data(db_track_id, extra_data) - logger.info(f"Wrote discovery results to DB for {url_hash}") - except Exception as wb_err: - logger.error(f"Error writing discovery results to DB: {wb_err}") +def _build_youtube_discovery_deps(): + """Build the YoutubeDiscoveryDeps bundle from web_server.py globals on each call.""" + return _discovery_youtube.YoutubeDiscoveryDeps( + youtube_playlist_states=youtube_playlist_states, + spotify_client=spotify_client, + matching_engine=matching_engine, + pause_enrichment_workers=_pause_enrichment_workers, + resume_enrichment_workers=_resume_enrichment_workers, + get_active_discovery_source=_get_active_discovery_source, + get_metadata_fallback_client=_get_metadata_fallback_client, + get_discovery_cache_key=_get_discovery_cache_key, + validate_discovery_cache_artist=_validate_discovery_cache_artist, + extract_artist_name=_extract_artist_name, + spotify_rate_limited=_spotify_rate_limited, + discovery_score_candidates=_discovery_score_candidates, + get_metadata_cache=get_metadata_cache, + build_discovery_wing_it_stub=_build_discovery_wing_it_stub, + get_database=get_database, + add_activity_item=add_activity_item, + ) - playlist_name = playlist['name'] - source_label = discovery_source.upper() - wing_it_count = state.get('wing_it_count', 0) - activity_msg = f"'{playlist_name}' - {state['spotify_matches']}/{len(tracks)} tracks found" - if wing_it_count: - activity_msg += f", {wing_it_count} wing it" - add_activity_item("", f"YouTube Discovery Complete ({source_label})", activity_msg, "Now") - logger.info(f"YouTube discovery complete ({discovery_source}): {state['spotify_matches']}/{len(tracks)} tracks matched, {wing_it_count} wing it") +def _run_youtube_discovery_worker(url_hash): + return _discovery_youtube.run_youtube_discovery_worker(url_hash, _build_youtube_discovery_deps()) - except Exception as e: - logger.error(f"Error in YouTube discovery worker: {e}") - state['status'] = 'error' - state['phase'] = 'fresh' - finally: - _resume_enrichment_workers(_ew_state, 'YouTube discovery') def _run_listenbrainz_discovery_worker(state_key): """Background worker for ListenBrainz music discovery process (Spotify preferred, iTunes fallback)"""