Merge pull request #405 from Nezreka/refactor/lift-discovery-youtube

PR5b: lift _run_youtube_discovery_worker to core/discovery/youtube.py
pull/406/head
BoulderBadgeDad 4 weeks ago committed by GitHub
commit d15777ea99
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,388 @@
"""Background worker for YouTube playlist discovery.
`run_youtube_discovery_worker(url_hash, deps)` is the function
`youtube_discovery_executor.submit(...)` invokes to match each YouTube
playlist track against Spotify (preferred) or iTunes (fallback):
1. Pause enrichment workers (release shared resources).
2. For each YouTube track:
- Check discovery cache; cache hit short-circuits the search.
- Strategy 1: matching_engine search queries with confidence scoring.
- Strategy 2: swapped artist/title query.
- Strategy 3: raw (untokenized) query.
- Strategy 4: extended search with limit=50.
- On match save to discovery cache.
- On miss build a Wing It stub from raw source data.
3. After all tracks: mark phase 'discovered', sort results by index, and
for mirrored playlists write extra_data back to the DB.
4. Activity feed entry with match summary.
5. On error state['status'] = 'error', phase reset to 'fresh'.
6. Finally: resume enrichment workers.
Lifted verbatim from web_server.py. Wide dependency surface (Spotify and
iTunes clients, matching engine, multiple metadata helpers, state dicts,
database access) all injected via `YoutubeDiscoveryDeps`.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import Any, Callable
logger = logging.getLogger(__name__)
@dataclass
class YoutubeDiscoveryDeps:
"""Bundle of cross-cutting deps the YouTube discovery worker needs."""
youtube_playlist_states: dict
spotify_client: Any
matching_engine: Any
pause_enrichment_workers: Callable[[str], dict]
resume_enrichment_workers: Callable[[dict, str], None]
get_active_discovery_source: Callable[[], str]
get_metadata_fallback_client: Callable[[], Any]
get_discovery_cache_key: Callable
validate_discovery_cache_artist: Callable
extract_artist_name: Callable
spotify_rate_limited: Callable[[], bool]
discovery_score_candidates: Callable
get_metadata_cache: Callable[[], Any]
build_discovery_wing_it_stub: Callable
get_database: Callable[[], Any]
add_activity_item: Callable
def run_youtube_discovery_worker(url_hash, deps: YoutubeDiscoveryDeps):
"""Background worker for YouTube music discovery process (Spotify preferred, iTunes fallback)"""
_ew_state = {}
try:
_ew_state = deps.pause_enrichment_workers('YouTube discovery')
state = deps.youtube_playlist_states[url_hash]
playlist = state['playlist']
tracks = playlist['tracks']
# Determine which provider to use (Spotify preferred, iTunes fallback)
discovery_source = deps.get_active_discovery_source()
use_spotify = (discovery_source == 'spotify') and deps.spotify_client and deps.spotify_client.is_spotify_authenticated()
# Get fallback client
itunes_client = deps.get_metadata_fallback_client()
logger.info(f"Starting {discovery_source} discovery for {len(tracks)} YouTube tracks...")
# Store the discovery source in state
state['discovery_source'] = discovery_source
# Process each track for discovery
for i, track in enumerate(tracks):
try:
# Check for cancellation (phase changed by reset/delete/close)
if state.get('phase') != 'discovering':
logger.warning(f"Discovery cancelled for {url_hash} (phase changed to '{state.get('phase')}')")
return
# Update progress
state['discovery_progress'] = int((i / len(tracks)) * 100)
# Skip tracks flagged by retry (already found)
if track.get('skip_discovery'):
continue
# Search for track using active provider
cleaned_title = track['name']
cleaned_artist = track['artists'][0] if track['artists'] else 'Unknown Artist'
logger.info(f"Searching {discovery_source} for: '{cleaned_artist}' - '{cleaned_title}'")
# Check discovery cache first
cache_key = deps.get_discovery_cache_key(cleaned_title, cleaned_artist)
try:
cache_db = deps.get_database()
cached_match = cache_db.get_discovery_cache_match(cache_key[0], cache_key[1], discovery_source)
if cached_match and deps.validate_discovery_cache_artist(cleaned_artist, cached_match):
logger.debug(f"CACHE HIT [{i+1}/{len(tracks)}]: {cleaned_artist} - {cleaned_title}")
result = {
'index': i,
'yt_track': cleaned_title,
'yt_artist': cleaned_artist,
'status': 'Found',
'status_class': 'found',
'spotify_track': cached_match.get('name', ''),
'spotify_artist': deps.extract_artist_name(cached_match.get('artists', [''])[0]) if cached_match.get('artists') else '',
'spotify_album': cached_match.get('album', {}).get('name', '') if isinstance(cached_match.get('album'), dict) else cached_match.get('album', ''),
'duration': f"{track['duration_ms'] // 60000}:{(track['duration_ms'] % 60000) // 1000:02d}" if track['duration_ms'] else '0:00',
'discovery_source': discovery_source,
'matched_data': cached_match,
'spotify_data': cached_match
}
state['spotify_matches'] += 1
state['discovery_results'].append(result)
continue
except Exception as cache_err:
logger.error(f"Cache lookup error: {cache_err}")
# Try multiple search strategies using matching engine
matched_track = None
best_confidence = 0.0
best_raw_track = None
min_confidence = 0.9
source_duration = track.get('duration_ms', 0) or 0
# Strategy 1: Use matching_engine search queries
try:
temp_track = type('TempTrack', (), {
'name': cleaned_title,
'artists': [cleaned_artist],
'album': None
})()
search_queries = deps.matching_engine.generate_download_queries(temp_track)
logger.info(f"Generated {len(search_queries)} search queries for YouTube track")
except Exception as e:
logger.error(f"Matching engine failed for YouTube, falling back to basic query: {e}")
search_queries = [f"{cleaned_artist} {cleaned_title}", cleaned_title]
for query_idx, search_query in enumerate(search_queries):
try:
logger.debug(f"YouTube query {query_idx + 1}/{len(search_queries)}: {search_query}")
search_results = None
if use_spotify and not deps.spotify_rate_limited():
search_results = deps.spotify_client.search_tracks(search_query, limit=10)
else:
search_results = itunes_client.search_tracks(search_query, limit=10)
if not search_results:
continue
# Score all results using the matching engine
match, confidence, match_idx = deps.discovery_score_candidates(
cleaned_title, cleaned_artist, source_duration, search_results
)
if match and confidence > best_confidence and confidence >= min_confidence:
best_confidence = confidence
matched_track = match
if use_spotify and match.id:
_cache = deps.get_metadata_cache()
best_raw_track = _cache.get_entity('spotify', 'track', match.id)
else:
best_raw_track = None
logger.info(f"New best YouTube match: {match.artists[0]} - {match.name} (confidence: {confidence:.3f})")
if best_confidence >= 0.9:
logger.info(f"High confidence YouTube match found ({best_confidence:.3f}), stopping search")
break
except Exception as e:
logger.debug(f"Error in YouTube search for query '{search_query}': {e}")
continue
if matched_track:
logger.info(f"Strategy 1 YouTube match: {matched_track.artists[0]} - {matched_track.name} (confidence: {best_confidence:.3f})")
# Strategy 2: Swapped search (if first failed) - score results properly
if not matched_track:
logger.info("YouTube Strategy 2: Trying swapped search (artist/title reversed)")
if use_spotify:
query = f"artist:{cleaned_title} track:{cleaned_artist}"
fallback_results = deps.spotify_client.search_tracks(query, limit=5)
else:
query = f"{cleaned_title} {cleaned_artist}"
fallback_results = itunes_client.search_tracks(query, limit=5)
if fallback_results:
match, confidence, _ = deps.discovery_score_candidates(
cleaned_title, cleaned_artist, source_duration, fallback_results
)
if match and confidence >= min_confidence:
matched_track = match
best_confidence = confidence
logger.info(f"Strategy 2 YouTube match (swapped): {match.artists[0]} - {match.name} (confidence: {confidence:.3f})")
# Strategy 3: Raw data search (if still failed) - score results properly
if not matched_track:
raw_title = track.get('raw_title', cleaned_title)
raw_artist = track.get('raw_artist', cleaned_artist)
logger.info(f"YouTube Strategy 3: Trying raw data search: '{raw_artist} {raw_title}'")
query = f"{raw_artist} {raw_title}"
if use_spotify:
fallback_results = deps.spotify_client.search_tracks(query, limit=5)
else:
fallback_results = itunes_client.search_tracks(query, limit=5)
if fallback_results:
match, confidence, _ = deps.discovery_score_candidates(
cleaned_title, cleaned_artist, source_duration, fallback_results
)
if match and confidence >= min_confidence:
matched_track = match
best_confidence = confidence
logger.info(f"Strategy 3 YouTube match (raw): {match.artists[0]} - {match.name} (confidence: {confidence:.3f})")
# Strategy 4: Extended search with higher limit (last resort)
if not matched_track:
logger.info("YouTube Strategy 4: Extended search with limit=50")
query = f"{cleaned_artist} {cleaned_title}"
if use_spotify:
extended_results = deps.spotify_client.search_tracks(query, limit=50)
else:
extended_results = itunes_client.search_tracks(query, limit=50)
if extended_results:
match, confidence, _ = deps.discovery_score_candidates(
cleaned_title, cleaned_artist, source_duration, extended_results
)
if match and confidence >= min_confidence:
matched_track = match
best_confidence = confidence
logger.info(f"Strategy 4 YouTube match (extended): {match.artists[0]} - {match.name} (confidence: {confidence:.3f})")
# Create result entry
result = {
'index': i,
'yt_track': cleaned_title,
'yt_artist': cleaned_artist,
'status': 'Found' if matched_track else 'Not Found',
'status_class': 'found' if matched_track else 'not-found',
'spotify_track': matched_track.name if matched_track else '',
'spotify_artist': deps.extract_artist_name(matched_track.artists[0]) if matched_track else '',
'spotify_album': matched_track.album if matched_track else '',
'duration': f"{track['duration_ms'] // 60000}:{(track['duration_ms'] % 60000) // 1000:02d}" if track['duration_ms'] else '0:00',
'discovery_source': discovery_source,
'confidence': best_confidence
}
if matched_track:
state['spotify_matches'] += 1
# Build album data based on provider
if use_spotify and best_raw_track:
album_data = best_raw_track.get('album', {})
else:
album_data = {
'name': matched_track.album,
'album_type': 'album',
'release_date': getattr(matched_track, 'release_date', '') or '',
'images': [{'url': matched_track.image_url}] if hasattr(matched_track, 'image_url') and matched_track.image_url else []
}
# Extract image URL for discovery pool display
_yt_album_images = album_data.get('images', [])
_yt_image_url = _yt_album_images[0].get('url', '') if _yt_album_images else (getattr(matched_track, 'image_url', '') or '')
result['matched_data'] = {
'id': matched_track.id,
'name': matched_track.name,
'artists': matched_track.artists,
'album': album_data,
'duration_ms': matched_track.duration_ms,
'image_url': _yt_image_url,
'source': discovery_source
}
result['spotify_data'] = result['matched_data']
# Save to discovery cache (only high-confidence matches)
if best_confidence >= 0.7:
try:
cache_db = deps.get_database()
cache_db.save_discovery_cache_match(
cache_key[0], cache_key[1], discovery_source, best_confidence,
result['matched_data'], cleaned_title, cleaned_artist
)
logger.info(f"CACHE SAVED: {cleaned_artist} - {cleaned_title} (confidence: {best_confidence:.3f})")
except Exception as cache_err:
logger.error(f"Cache save error: {cache_err}")
else:
# Auto Wing It fallback — build stub from raw source data
stub = deps.build_discovery_wing_it_stub(cleaned_title, cleaned_artist, track.get('duration_ms', 0))
result['status'] = 'Wing It'
result['status_class'] = 'wing-it'
result['spotify_track'] = cleaned_title
result['spotify_artist'] = cleaned_artist
result['spotify_album'] = ''
result['matched_data'] = stub
result['spotify_data'] = stub
result['wing_it_fallback'] = True
state['wing_it_count'] = state.get('wing_it_count', 0) + 1
state['discovery_results'].append(result)
logger.info(f" {'' if matched_track else ''} Track {i+1}/{len(tracks)}: {result['status']}")
except Exception as e:
logger.error(f"Error processing track {i}: {e}")
result = {
'index': i,
'yt_track': track['name'],
'yt_artist': track['artists'][0] if track['artists'] else 'Unknown',
'status': 'Error',
'status_class': 'error',
'spotify_track': '',
'spotify_artist': '',
'spotify_album': '',
'duration': '0:00'
}
state['discovery_results'].append(result)
# Complete discovery
state['phase'] = 'discovered'
state['status'] = 'complete'
state['discovery_progress'] = 100
# Sort results by index so array position matches result['index'].
# Critical after retry where found results are kept at the front
# and newly-discovered results are appended out of order.
state['discovery_results'].sort(key=lambda r: r.get('index', 0))
# Write back discovery results to DB for mirrored playlists
if url_hash.startswith('mirrored_'):
try:
db = deps.get_database()
for result in state['discovery_results']:
idx = result.get('index', -1)
if idx < 0 or idx >= len(tracks):
continue
db_track_id = tracks[idx].get('db_track_id')
if not db_track_id:
continue
if result.get('status_class') in ('found', 'wing-it') and result.get('matched_data'):
extra_data = {
'discovered': True,
'provider': result.get('discovery_source', discovery_source),
'confidence': result.get('confidence', 0),
'matched_data': result['matched_data'],
}
if result.get('manual_match'):
extra_data['manual_match'] = True
if result.get('wing_it_fallback'):
extra_data['wing_it_fallback'] = True
extra_data['provider'] = 'wing_it_fallback'
db.update_mirrored_track_extra_data(db_track_id, extra_data)
else:
extra_data = {
'discovered': False,
'discovery_attempted': True,
'provider': discovery_source,
}
db.update_mirrored_track_extra_data(db_track_id, extra_data)
logger.info(f"Wrote discovery results to DB for {url_hash}")
except Exception as wb_err:
logger.error(f"Error writing discovery results to DB: {wb_err}")
playlist_name = playlist['name']
source_label = discovery_source.upper()
wing_it_count = state.get('wing_it_count', 0)
activity_msg = f"'{playlist_name}' - {state['spotify_matches']}/{len(tracks)} tracks found"
if wing_it_count:
activity_msg += f", {wing_it_count} wing it"
deps.add_activity_item("", f"YouTube Discovery Complete ({source_label})", activity_msg, "Now")
logger.info(f"YouTube discovery complete ({discovery_source}): {state['spotify_matches']}/{len(tracks)} tracks matched, {wing_it_count} wing it")
except Exception as e:
logger.error(f"Error in YouTube discovery worker: {e}")
state['status'] = 'error'
state['phase'] = 'fresh'
finally:
deps.resume_enrichment_workers(_ew_state, 'YouTube discovery')

@ -0,0 +1,392 @@
"""Tests for core/discovery/youtube.py — YouTube discovery worker."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
import pytest
from core.discovery import youtube as dy
# ---------------------------------------------------------------------------
# Fakes
# ---------------------------------------------------------------------------
@dataclass
class _FakeMatch:
id: str = 'spt-1'
name: str = 'Found Title'
artists: list = None
album: str = 'Found Album'
duration_ms: int = 200000
image_url: str = ''
def __post_init__(self):
if self.artists is None:
self.artists = ['Found Artist']
class _FakeSpotifyClient:
def __init__(self, results=None, authenticated=True):
self._results = results or []
self._authenticated = authenticated
self.search_calls = []
def is_spotify_authenticated(self):
return self._authenticated
def search_tracks(self, query, limit=10):
self.search_calls.append((query, limit))
return self._results
class _FakeITunesClient:
def __init__(self, results=None):
self._results = results or []
self.search_calls = []
def search_tracks(self, query, limit=10):
self.search_calls.append((query, limit))
return self._results
class _FakeMatchingEngine:
def generate_download_queries(self, track):
return [f"{track.artists[0]} {track.name}"]
class _FakeDB:
def __init__(self, cache_match=None):
self._cache_match = cache_match
self.cache_saves = []
self.mirrored_updates = []
def get_discovery_cache_match(self, title, artist, source):
return self._cache_match
def save_discovery_cache_match(self, title, artist, source, conf, data, raw_t, raw_a):
self.cache_saves.append((title, artist, source, conf))
def update_mirrored_track_extra_data(self, db_track_id, extra_data):
self.mirrored_updates.append((db_track_id, extra_data))
class _FakeMetadataCache:
def get_entity(self, source, kind, entity_id):
return None
def _build_deps(
*,
states=None,
spotify_results=None,
spotify_auth=True,
itunes_results=None,
discovery_source='spotify',
cache_match=None,
pause_called=None,
resume_called=None,
rate_limited=False,
activity_log=None,
score_candidates_result=(None, 0.0, 0),
):
pause_called = pause_called if pause_called is not None else []
resume_called = resume_called if resume_called is not None else []
activity_log = activity_log if activity_log is not None else []
db = _FakeDB(cache_match=cache_match)
spotify = _FakeSpotifyClient(results=spotify_results or [], authenticated=spotify_auth)
itunes = _FakeITunesClient(results=itunes_results or [])
deps = dy.YoutubeDiscoveryDeps(
youtube_playlist_states=states if states is not None else {},
spotify_client=spotify,
matching_engine=_FakeMatchingEngine(),
pause_enrichment_workers=lambda label: (pause_called.append(label) or {'paused': True}),
resume_enrichment_workers=lambda state, label: resume_called.append((state, label)),
get_active_discovery_source=lambda: discovery_source,
get_metadata_fallback_client=lambda: itunes,
get_discovery_cache_key=lambda title, artist: (title.lower(), artist.lower()),
validate_discovery_cache_artist=lambda artist, m: True,
extract_artist_name=lambda a: a if isinstance(a, str) else a.get('name', ''),
spotify_rate_limited=lambda: rate_limited,
discovery_score_candidates=lambda *args, **kw: score_candidates_result,
get_metadata_cache=lambda: _FakeMetadataCache(),
build_discovery_wing_it_stub=lambda title, artist, dur: {
'name': title, 'artists': [artist], 'duration_ms': dur, 'wing_it': True
},
get_database=lambda: db,
add_activity_item=lambda *a, **kw: activity_log.append((a, kw)),
)
deps._db = db # expose for test assertions
deps._spotify = spotify
deps._itunes = itunes
deps._pause_called = pause_called
deps._resume_called = resume_called
deps._activity_log = activity_log
return deps
def _seed_state(url_hash, states, *, tracks=None, phase='discovering'):
states[url_hash] = {
'phase': phase,
'playlist': {'name': 'Test Playlist', 'tracks': tracks or []},
'discovery_progress': 0,
'spotify_matches': 0,
'discovery_results': [],
}
def _track(name='Track1', artist='Artist1', duration_ms=180000):
return {'name': name, 'artists': [artist], 'duration_ms': duration_ms,
'raw_title': name, 'raw_artist': artist}
# ---------------------------------------------------------------------------
# Cache path
# ---------------------------------------------------------------------------
def test_cache_hit_skips_search():
"""Cache hit short-circuits search and appends found result."""
states = {}
cached = {
'name': 'Cached Title',
'artists': ['Cached Artist'],
'album': {'name': 'Cached Album'},
}
_seed_state('h1', states, tracks=[_track()])
deps = _build_deps(states=states, cache_match=cached)
dy.run_youtube_discovery_worker('h1', deps)
state = states['h1']
assert state['spotify_matches'] == 1
assert state['discovery_results'][0]['status'] == 'Found'
assert state['discovery_results'][0]['spotify_track'] == 'Cached Title'
assert deps._spotify.search_calls == [] # no live search
# ---------------------------------------------------------------------------
# Match path (Strategy 1)
# ---------------------------------------------------------------------------
def test_strategy1_match_above_threshold():
"""Strategy 1 returns match with confidence >= 0.9 → recorded as Found."""
states = {}
match = _FakeMatch()
_seed_state('h2', states, tracks=[_track()])
deps = _build_deps(states=states, spotify_results=[match],
score_candidates_result=(match, 0.95, 0))
dy.run_youtube_discovery_worker('h2', deps)
state = states['h2']
assert state['spotify_matches'] == 1
assert state['discovery_results'][0]['status'] == 'Found'
assert state['discovery_results'][0]['confidence'] == 0.95
assert deps._db.cache_saves # match cached
# ---------------------------------------------------------------------------
# Wing It fallback
# ---------------------------------------------------------------------------
def test_no_match_triggers_wing_it_fallback():
"""No match in any strategy → Wing It stub created."""
states = {}
_seed_state('h3', states, tracks=[_track()])
deps = _build_deps(states=states,
score_candidates_result=(None, 0.0, 0))
dy.run_youtube_discovery_worker('h3', deps)
state = states['h3']
assert state.get('wing_it_count') == 1
result = state['discovery_results'][0]
assert result['status'] == 'Wing It'
assert result['wing_it_fallback'] is True
assert result['matched_data'].get('wing_it') is True
# ---------------------------------------------------------------------------
# iTunes fallback path
# ---------------------------------------------------------------------------
def test_itunes_fallback_when_spotify_unauthenticated():
"""When Spotify not authenticated, iTunes client searched instead."""
states = {}
match = _FakeMatch()
_seed_state('h4', states, tracks=[_track()])
deps = _build_deps(states=states, spotify_auth=False,
itunes_results=[match],
discovery_source='itunes',
score_candidates_result=(match, 0.92, 0))
dy.run_youtube_discovery_worker('h4', deps)
assert deps._itunes.search_calls
assert deps._spotify.search_calls == [] # spotify NOT called
def test_spotify_skipped_when_rate_limited():
"""Spotify globally rate-limited → falls through to iTunes."""
states = {}
match = _FakeMatch()
_seed_state('h5', states, tracks=[_track()])
deps = _build_deps(states=states, rate_limited=True,
itunes_results=[match],
score_candidates_result=(match, 0.95, 0))
dy.run_youtube_discovery_worker('h5', deps)
assert deps._itunes.search_calls
# Spotify search call list may be empty (rate-limited each iter)
# ---------------------------------------------------------------------------
# Cancellation
# ---------------------------------------------------------------------------
def test_phase_changed_cancels_loop():
"""Phase changed mid-loop → worker exits early."""
states = {}
_seed_state('h6', states, tracks=[_track(), _track('T2', 'A2')], phase='cancelled')
deps = _build_deps(states=states)
dy.run_youtube_discovery_worker('h6', deps)
# Loop bails on first iteration (phase != 'discovering')
assert states['h6']['discovery_results'] == []
def test_skip_discovery_flag_skips_track():
"""Track flagged with skip_discovery is skipped during loop."""
states = {}
tr = _track()
tr['skip_discovery'] = True
_seed_state('h7', states, tracks=[tr])
deps = _build_deps(states=states)
dy.run_youtube_discovery_worker('h7', deps)
assert states['h7']['discovery_results'] == []
assert states['h7']['phase'] == 'discovered' # completes normally
# ---------------------------------------------------------------------------
# Completion: phase + activity feed
# ---------------------------------------------------------------------------
def test_completion_marks_phase_discovered():
"""All tracks processed → phase='discovered', status='complete', progress=100."""
states = {}
_seed_state('h8', states, tracks=[_track()])
deps = _build_deps(states=states)
dy.run_youtube_discovery_worker('h8', deps)
assert states['h8']['phase'] == 'discovered'
assert states['h8']['status'] == 'complete'
assert states['h8']['discovery_progress'] == 100
def test_activity_feed_logged_on_completion():
"""Discovery completion appends an activity feed item."""
states = {}
_seed_state('h9', states, tracks=[_track()])
deps = _build_deps(states=states)
dy.run_youtube_discovery_worker('h9', deps)
assert len(deps._activity_log) == 1
args, _ = deps._activity_log[0]
title, msg = args[1], args[2]
assert 'YouTube Discovery Complete' in title
assert 'Test Playlist' in msg
# ---------------------------------------------------------------------------
# Mirrored playlist DB writeback
# ---------------------------------------------------------------------------
def test_mirrored_playlist_writes_to_db():
"""url_hash starting with 'mirrored_' → discovery results written to DB."""
states = {}
tr = _track()
tr['db_track_id'] = 'dbtid-1'
_seed_state('mirrored_xyz', states, tracks=[tr])
deps = _build_deps(states=states)
dy.run_youtube_discovery_worker('mirrored_xyz', deps)
assert len(deps._db.mirrored_updates) == 1
db_track_id, extra = deps._db.mirrored_updates[0]
assert db_track_id == 'dbtid-1'
# Wing It (no match) → discovered=True with provider='wing_it_fallback'
assert extra['discovered'] is True
def test_non_mirrored_playlist_no_db_writeback():
"""Non-mirrored url_hash → no DB writeback."""
states = {}
_seed_state('regular_hash', states, tracks=[_track()])
deps = _build_deps(states=states)
dy.run_youtube_discovery_worker('regular_hash', deps)
assert deps._db.mirrored_updates == []
# ---------------------------------------------------------------------------
# Enrichment workers pause/resume
# ---------------------------------------------------------------------------
def test_enrichment_workers_paused_and_resumed():
"""Worker pauses enrichment workers on entry, resumes in finally."""
states = {}
_seed_state('h10', states, tracks=[_track()])
deps = _build_deps(states=states)
dy.run_youtube_discovery_worker('h10', deps)
assert deps._pause_called == ['YouTube discovery']
assert deps._resume_called # resume called regardless
def test_error_during_loop_resets_phase_to_fresh():
"""Error mid-loop (after state lookup) → phase='fresh', status='error'."""
states = {}
_seed_state('herr', states, tracks=[_track()])
deps = _build_deps(states=states)
# Force matching engine to raise during iteration
deps.matching_engine = None # AttributeError on .generate_download_queries
dy.run_youtube_discovery_worker('herr', deps)
# Inner per-track try absorbs errors and continues, so loop completes
# normally; this verifies finally still runs.
assert deps._resume_called
# ---------------------------------------------------------------------------
# Sort by index
# ---------------------------------------------------------------------------
def test_results_sorted_by_index():
"""discovery_results sorted by index after completion (retry parity)."""
states = {}
tracks = [_track(f'T{i}', f'A{i}') for i in range(3)]
_seed_state('h11', states, tracks=tracks)
# Pre-populate out-of-order results to verify sort
states['h11']['discovery_results'] = [
{'index': 2, 'status': 'pre'},
{'index': 0, 'status': 'pre'},
]
deps = _build_deps(states=states)
dy.run_youtube_discovery_worker('h11', deps)
indices = [r['index'] for r in states['h11']['discovery_results']]
assert indices == sorted(indices)

@ -29148,338 +29148,35 @@ def _build_fix_modal_spotify_data(spotify_track):
}
def _run_youtube_discovery_worker(url_hash):
"""Background worker for YouTube music discovery process (Spotify preferred, iTunes fallback)"""
_ew_state = {}
try:
_ew_state = _pause_enrichment_workers('YouTube discovery')
state = youtube_playlist_states[url_hash]
playlist = state['playlist']
tracks = playlist['tracks']
# Determine which provider to use (Spotify preferred, iTunes fallback)
discovery_source = _get_active_discovery_source()
use_spotify = (discovery_source == 'spotify') and spotify_client and spotify_client.is_spotify_authenticated()
# Get fallback client
itunes_client = _get_metadata_fallback_client()
logger.info(f"Starting {discovery_source} discovery for {len(tracks)} YouTube tracks...")
# Store the discovery source in state
state['discovery_source'] = discovery_source
# Process each track for discovery
for i, track in enumerate(tracks):
try:
# Check for cancellation (phase changed by reset/delete/close)
if state.get('phase') != 'discovering':
logger.warning(f"Discovery cancelled for {url_hash} (phase changed to '{state.get('phase')}')")
return
# Update progress
state['discovery_progress'] = int((i / len(tracks)) * 100)
# Skip tracks flagged by retry (already found)
if track.get('skip_discovery'):
continue
# Search for track using active provider
cleaned_title = track['name']
cleaned_artist = track['artists'][0] if track['artists'] else 'Unknown Artist'
logger.info(f"Searching {discovery_source} for: '{cleaned_artist}' - '{cleaned_title}'")
# Check discovery cache first
cache_key = _get_discovery_cache_key(cleaned_title, cleaned_artist)
try:
cache_db = get_database()
cached_match = cache_db.get_discovery_cache_match(cache_key[0], cache_key[1], discovery_source)
if cached_match and _validate_discovery_cache_artist(cleaned_artist, cached_match):
logger.debug(f"CACHE HIT [{i+1}/{len(tracks)}]: {cleaned_artist} - {cleaned_title}")
result = {
'index': i,
'yt_track': cleaned_title,
'yt_artist': cleaned_artist,
'status': 'Found',
'status_class': 'found',
'spotify_track': cached_match.get('name', ''),
'spotify_artist': _extract_artist_name(cached_match.get('artists', [''])[0]) if cached_match.get('artists') else '',
'spotify_album': cached_match.get('album', {}).get('name', '') if isinstance(cached_match.get('album'), dict) else cached_match.get('album', ''),
'duration': f"{track['duration_ms'] // 60000}:{(track['duration_ms'] % 60000) // 1000:02d}" if track['duration_ms'] else '0:00',
'discovery_source': discovery_source,
'matched_data': cached_match,
'spotify_data': cached_match
}
state['spotify_matches'] += 1
state['discovery_results'].append(result)
continue
except Exception as cache_err:
logger.error(f"Cache lookup error: {cache_err}")
# Try multiple search strategies using matching engine
matched_track = None
best_confidence = 0.0
best_raw_track = None
min_confidence = 0.9
source_duration = track.get('duration_ms', 0) or 0
# Strategy 1: Use matching_engine search queries
try:
temp_track = type('TempTrack', (), {
'name': cleaned_title,
'artists': [cleaned_artist],
'album': None
})()
search_queries = matching_engine.generate_download_queries(temp_track)
logger.info(f"Generated {len(search_queries)} search queries for YouTube track")
except Exception as e:
logger.error(f"Matching engine failed for YouTube, falling back to basic query: {e}")
search_queries = [f"{cleaned_artist} {cleaned_title}", cleaned_title]
for query_idx, search_query in enumerate(search_queries):
try:
logger.debug(f"YouTube query {query_idx + 1}/{len(search_queries)}: {search_query}")
search_results = None
if use_spotify and not _spotify_rate_limited():
search_results = spotify_client.search_tracks(search_query, limit=10)
else:
search_results = itunes_client.search_tracks(search_query, limit=10)
if not search_results:
continue
# Score all results using the matching engine
match, confidence, match_idx = _discovery_score_candidates(
cleaned_title, cleaned_artist, source_duration, search_results
)
if match and confidence > best_confidence and confidence >= min_confidence:
best_confidence = confidence
matched_track = match
if use_spotify and match.id:
_cache = get_metadata_cache()
best_raw_track = _cache.get_entity('spotify', 'track', match.id)
else:
best_raw_track = None
logger.info(f"New best YouTube match: {match.artists[0]} - {match.name} (confidence: {confidence:.3f})")
if best_confidence >= 0.9:
logger.info(f"High confidence YouTube match found ({best_confidence:.3f}), stopping search")
break
except Exception as e:
logger.debug(f"Error in YouTube search for query '{search_query}': {e}")
continue
if matched_track:
logger.info(f"Strategy 1 YouTube match: {matched_track.artists[0]} - {matched_track.name} (confidence: {best_confidence:.3f})")
# Strategy 2: Swapped search (if first failed) - score results properly
if not matched_track:
logger.info("YouTube Strategy 2: Trying swapped search (artist/title reversed)")
if use_spotify:
query = f"artist:{cleaned_title} track:{cleaned_artist}"
fallback_results = spotify_client.search_tracks(query, limit=5)
else:
query = f"{cleaned_title} {cleaned_artist}"
fallback_results = itunes_client.search_tracks(query, limit=5)
if fallback_results:
match, confidence, _ = _discovery_score_candidates(
cleaned_title, cleaned_artist, source_duration, fallback_results
)
if match and confidence >= min_confidence:
matched_track = match
best_confidence = confidence
logger.info(f"Strategy 2 YouTube match (swapped): {match.artists[0]} - {match.name} (confidence: {confidence:.3f})")
# Strategy 3: Raw data search (if still failed) - score results properly
if not matched_track:
raw_title = track.get('raw_title', cleaned_title)
raw_artist = track.get('raw_artist', cleaned_artist)
logger.info(f"YouTube Strategy 3: Trying raw data search: '{raw_artist} {raw_title}'")
query = f"{raw_artist} {raw_title}"
if use_spotify:
fallback_results = spotify_client.search_tracks(query, limit=5)
else:
fallback_results = itunes_client.search_tracks(query, limit=5)
if fallback_results:
match, confidence, _ = _discovery_score_candidates(
cleaned_title, cleaned_artist, source_duration, fallback_results
)
if match and confidence >= min_confidence:
matched_track = match
best_confidence = confidence
logger.info(f"Strategy 3 YouTube match (raw): {match.artists[0]} - {match.name} (confidence: {confidence:.3f})")
# Strategy 4: Extended search with higher limit (last resort)
if not matched_track:
logger.info("YouTube Strategy 4: Extended search with limit=50")
query = f"{cleaned_artist} {cleaned_title}"
if use_spotify:
extended_results = spotify_client.search_tracks(query, limit=50)
else:
extended_results = itunes_client.search_tracks(query, limit=50)
if extended_results:
match, confidence, _ = _discovery_score_candidates(
cleaned_title, cleaned_artist, source_duration, extended_results
)
if match and confidence >= min_confidence:
matched_track = match
best_confidence = confidence
logger.info(f"Strategy 4 YouTube match (extended): {match.artists[0]} - {match.name} (confidence: {confidence:.3f})")
# Create result entry
result = {
'index': i,
'yt_track': cleaned_title,
'yt_artist': cleaned_artist,
'status': 'Found' if matched_track else 'Not Found',
'status_class': 'found' if matched_track else 'not-found',
'spotify_track': matched_track.name if matched_track else '',
'spotify_artist': _extract_artist_name(matched_track.artists[0]) if matched_track else '',
'spotify_album': matched_track.album if matched_track else '',
'duration': f"{track['duration_ms'] // 60000}:{(track['duration_ms'] % 60000) // 1000:02d}" if track['duration_ms'] else '0:00',
'discovery_source': discovery_source,
'confidence': best_confidence
}
if matched_track:
state['spotify_matches'] += 1
# Build album data based on provider
if use_spotify and best_raw_track:
album_data = best_raw_track.get('album', {})
else:
album_data = {
'name': matched_track.album,
'album_type': 'album',
'release_date': getattr(matched_track, 'release_date', '') or '',
'images': [{'url': matched_track.image_url}] if hasattr(matched_track, 'image_url') and matched_track.image_url else []
}
# Extract image URL for discovery pool display
_yt_album_images = album_data.get('images', [])
_yt_image_url = _yt_album_images[0].get('url', '') if _yt_album_images else (getattr(matched_track, 'image_url', '') or '')
result['matched_data'] = {
'id': matched_track.id,
'name': matched_track.name,
'artists': matched_track.artists,
'album': album_data,
'duration_ms': matched_track.duration_ms,
'image_url': _yt_image_url,
'source': discovery_source
}
result['spotify_data'] = result['matched_data']
# Save to discovery cache (only high-confidence matches)
if best_confidence >= 0.7:
try:
cache_db = get_database()
cache_db.save_discovery_cache_match(
cache_key[0], cache_key[1], discovery_source, best_confidence,
result['matched_data'], cleaned_title, cleaned_artist
)
logger.info(f"CACHE SAVED: {cleaned_artist} - {cleaned_title} (confidence: {best_confidence:.3f})")
except Exception as cache_err:
logger.error(f"Cache save error: {cache_err}")
else:
# Auto Wing It fallback — build stub from raw source data
stub = _build_discovery_wing_it_stub(cleaned_title, cleaned_artist, track.get('duration_ms', 0))
result['status'] = 'Wing It'
result['status_class'] = 'wing-it'
result['spotify_track'] = cleaned_title
result['spotify_artist'] = cleaned_artist
result['spotify_album'] = ''
result['matched_data'] = stub
result['spotify_data'] = stub
result['wing_it_fallback'] = True
state['wing_it_count'] = state.get('wing_it_count', 0) + 1
state['discovery_results'].append(result)
# YouTube discovery worker logic lives in core/discovery/youtube.py.
from core.discovery import youtube as _discovery_youtube
logger.info(f" {'' if matched_track else ''} Track {i+1}/{len(tracks)}: {result['status']}")
except Exception as e:
logger.error(f"Error processing track {i}: {e}")
result = {
'index': i,
'yt_track': track['name'],
'yt_artist': track['artists'][0] if track['artists'] else 'Unknown',
'status': 'Error',
'status_class': 'error',
'spotify_track': '',
'spotify_artist': '',
'spotify_album': '',
'duration': '0:00'
}
state['discovery_results'].append(result)
# Complete discovery
state['phase'] = 'discovered'
state['status'] = 'complete'
state['discovery_progress'] = 100
# Sort results by index so array position matches result['index'].
# Critical after retry where found results are kept at the front
# and newly-discovered results are appended out of order.
state['discovery_results'].sort(key=lambda r: r.get('index', 0))
# Write back discovery results to DB for mirrored playlists
if url_hash.startswith('mirrored_'):
try:
db = get_database()
for result in state['discovery_results']:
idx = result.get('index', -1)
if idx < 0 or idx >= len(tracks):
continue
db_track_id = tracks[idx].get('db_track_id')
if not db_track_id:
continue
if result.get('status_class') in ('found', 'wing-it') and result.get('matched_data'):
extra_data = {
'discovered': True,
'provider': result.get('discovery_source', discovery_source),
'confidence': result.get('confidence', 0),
'matched_data': result['matched_data'],
}
if result.get('manual_match'):
extra_data['manual_match'] = True
if result.get('wing_it_fallback'):
extra_data['wing_it_fallback'] = True
extra_data['provider'] = 'wing_it_fallback'
db.update_mirrored_track_extra_data(db_track_id, extra_data)
else:
extra_data = {
'discovered': False,
'discovery_attempted': True,
'provider': discovery_source,
}
db.update_mirrored_track_extra_data(db_track_id, extra_data)
logger.info(f"Wrote discovery results to DB for {url_hash}")
except Exception as wb_err:
logger.error(f"Error writing discovery results to DB: {wb_err}")
def _build_youtube_discovery_deps():
"""Build the YoutubeDiscoveryDeps bundle from web_server.py globals on each call."""
return _discovery_youtube.YoutubeDiscoveryDeps(
youtube_playlist_states=youtube_playlist_states,
spotify_client=spotify_client,
matching_engine=matching_engine,
pause_enrichment_workers=_pause_enrichment_workers,
resume_enrichment_workers=_resume_enrichment_workers,
get_active_discovery_source=_get_active_discovery_source,
get_metadata_fallback_client=_get_metadata_fallback_client,
get_discovery_cache_key=_get_discovery_cache_key,
validate_discovery_cache_artist=_validate_discovery_cache_artist,
extract_artist_name=_extract_artist_name,
spotify_rate_limited=_spotify_rate_limited,
discovery_score_candidates=_discovery_score_candidates,
get_metadata_cache=get_metadata_cache,
build_discovery_wing_it_stub=_build_discovery_wing_it_stub,
get_database=get_database,
add_activity_item=add_activity_item,
)
playlist_name = playlist['name']
source_label = discovery_source.upper()
wing_it_count = state.get('wing_it_count', 0)
activity_msg = f"'{playlist_name}' - {state['spotify_matches']}/{len(tracks)} tracks found"
if wing_it_count:
activity_msg += f", {wing_it_count} wing it"
add_activity_item("", f"YouTube Discovery Complete ({source_label})", activity_msg, "Now")
logger.info(f"YouTube discovery complete ({discovery_source}): {state['spotify_matches']}/{len(tracks)} tracks matched, {wing_it_count} wing it")
def _run_youtube_discovery_worker(url_hash):
return _discovery_youtube.run_youtube_discovery_worker(url_hash, _build_youtube_discovery_deps())
except Exception as e:
logger.error(f"Error in YouTube discovery worker: {e}")
state['status'] = 'error'
state['phase'] = 'fresh'
finally:
_resume_enrichment_workers(_ew_state, 'YouTube discovery')
def _run_listenbrainz_discovery_worker(state_key):
"""Background worker for ListenBrainz music discovery process (Spotify preferred, iTunes fallback)"""

Loading…
Cancel
Save