PR5f: lift _run_beatport_discovery_worker to core/discovery/beatport.py

Sixth lift in the PR5 discovery-workers series. Pulls the 323-line
Beatport chart discovery worker out of `web_server.py` into its own
focused module under `core/discovery/`. Pure 1:1 lift — wrapper keeps
the original entry-point name.

What the Beatport discovery worker does:

1. Pause enrichment workers (release shared resources).
2. For each Beatport track:
   - Cancellation gate (state['phase'] != 'discovering').
   - Clean Beatport text (artist/title) of common annotations via
     `clean_beatport_text` helper.
   - Single-string artist normalization for "CID,Taylr Renee"-style
     entries — split on comma, take the first.
   - Discovery cache lookup; cache hit short-circuits the search and
     normalizes cached artists from ['str'] → [{'name': 'str'}] to
     match the frontend's expected list-of-objects shape.
   - matching_engine search-query generation (with high min_confidence
     of 0.9 to avoid bad matches).
   - Strategy 1: scored candidates from initial Spotify/iTunes searches.
   - Strategy 4: extended search with limit=50 if no high-confidence
     match found.
   - On Spotify match: format artists as [{'name': str}] objects, pull
     full album object from raw cache when available, fallback to
     reconstructed album dict otherwise.
   - On iTunes match: format with image_url-derived album.images entry
     (300x300 spec), source set to discovery_source.
   - Save matched result to discovery cache when confidence >= 0.75
     (note: lower than search threshold; discovery still benefits from
     these less-confident matches as user-visible suggestions).
   - On miss: Wing It stub stored as 'wing-it' status (success ticked).
3. After all tracks: phase='discovered', activity feed entry, sync
   discovery results back to mirrored playlist via
   `_sync_discovery_results_to_mirrored` with 'beatport' tag.
4. On error: state['phase']='fresh' + status='error'.
5. Finally: resume enrichment workers.

Dependencies injected via `BeatportDiscoveryDeps` (17 fields) —
beatport_chart_states, spotify_client, matching_engine, plus 14
callable helpers (pause/resume enrichment, get_active_discovery_source,
get_metadata_fallback_client, clean_beatport_text,
get_discovery_cache_key, get_database, validate_discovery_cache_artist,
spotify_rate_limited, discovery_score_candidates, get_metadata_cache,
build_discovery_wing_it_stub, add_activity_item,
sync_discovery_results_to_mirrored).

Diff vs original after `deps.X` → global X normalization is **zero
differences** — 323 lines orig = 323 lines lifted, byte-identical body
(including all whitespace, comments, log strings).

Tests: 12 new under tests/discovery/test_discovery_beatport.py covering
cache hit short-circuit (with cached-artist normalization), Spotify
match formatting (list and string artist inputs), iTunes match
(image_url to album.images), Wing It fallback, cancellation
(phase change), completion phase update, activity feed entry, mirrored
sync invocation, top-level error handler, per-track error handling,
comma-separated artist split.

Full suite: 1130 passing (was 1118). Ruff clean.
pull/410/head
Broque Thomas 4 weeks ago
parent c95e04988e
commit 04647eb9f7

@ -0,0 +1,388 @@
"""Background worker for Beatport chart discovery.
`run_beatport_discovery_worker(url_hash, deps)` is the function the
beatport discovery start-endpoint submits to its executor to match each
Beatport chart track against Spotify (preferred) or iTunes (fallback):
1. Pause enrichment workers (release shared resources).
2. For each Beatport track:
- Cancellation gate (state['phase'] != 'discovering').
- Clean Beatport text (artist/title) of common annotations.
- Discovery cache lookup; cache hit short-circuits the search and
normalizes cached artists from ['str'] [{'name': 'str'}].
- matching_engine search-query generation, with high min_confidence
(0.9) to avoid bad matches.
- Strategy 1: scored candidates from initial Spotify/iTunes searches.
- Strategy 4: extended search with limit=50 if no high-confidence
match found.
- On Spotify match: format artists as [{'name': str}] objects, pull
full album object from raw cache when available.
- On iTunes match: format with image_url-derived album.images entry.
- Save matched result to discovery cache when confidence >= 0.75.
- On miss: Wing It stub stored as 'wing-it' status (success ticked).
3. After all tracks: phase='discovered', activity feed entry, sync
discovery results back to mirrored playlist via
`_sync_discovery_results_to_mirrored`.
4. On error: state['phase']='fresh' + status='error'.
5. Finally: resume enrichment workers.
Lifted verbatim from web_server.py. Wide dependency surface (Spotify
and iTunes clients, matching engine, multiple discovery helpers, state
dict, mirrored sync) all injected via `BeatportDiscoveryDeps`.
"""
from __future__ import annotations
import logging
import time
from dataclasses import dataclass
from typing import Any, Callable
logger = logging.getLogger(__name__)
@dataclass
class BeatportDiscoveryDeps:
"""Bundle of cross-cutting deps the Beatport discovery worker needs."""
beatport_chart_states: dict
spotify_client: Any
matching_engine: Any
pause_enrichment_workers: Callable[[str], dict]
resume_enrichment_workers: Callable[[dict, str], None]
get_active_discovery_source: Callable[[], str]
get_metadata_fallback_client: Callable[[], Any]
clean_beatport_text: Callable[[str], str]
get_discovery_cache_key: Callable
get_database: Callable[[], Any]
validate_discovery_cache_artist: Callable
spotify_rate_limited: Callable[[], bool]
discovery_score_candidates: Callable
get_metadata_cache: Callable[[], Any]
build_discovery_wing_it_stub: Callable
add_activity_item: Callable
sync_discovery_results_to_mirrored: Callable
def run_beatport_discovery_worker(url_hash, deps: BeatportDiscoveryDeps):
"""Background worker for Beatport discovery process (Spotify preferred, iTunes fallback)"""
_ew_state = {}
try:
_ew_state = deps.pause_enrichment_workers('Beatport discovery')
state = deps.beatport_chart_states[url_hash]
chart = state['chart']
tracks = chart['tracks']
# Determine which provider to use
discovery_source = deps.get_active_discovery_source()
use_spotify = (discovery_source == 'spotify') and deps.spotify_client and deps.spotify_client.is_spotify_authenticated()
# Initialize fallback client if needed
itunes_client_instance = None
if not use_spotify:
itunes_client_instance = deps.get_metadata_fallback_client()
logger.info(f"Starting {discovery_source.upper()} discovery for {len(tracks)} Beatport tracks...")
# Store discovery source in state for frontend
state['discovery_source'] = discovery_source
# Process each track for discovery
for i, track in enumerate(tracks):
try:
# Check for cancellation
if state.get('phase') != 'discovering':
logger.warning(f"Beatport discovery cancelled (phase changed to '{state.get('phase')}')")
return
# Update progress
state['discovery_progress'] = int((i / len(tracks)) * 100)
# Get track info from Beatport data (frontend sends 'name' and 'artists' fields)
track_title = deps.clean_beatport_text(track.get('name', 'Unknown Title'))
track_artists = track.get('artists', ['Unknown Artist'])
# Handle artists - could be a list or string
if isinstance(track_artists, list):
if len(track_artists) > 0 and isinstance(track_artists[0], str):
# Handle case like ["CID,Taylr Renee"] - split on comma and clean
track_artist = deps.clean_beatport_text(track_artists[0].split(',')[0].strip())
else:
track_artist = deps.clean_beatport_text(track_artists[0] if track_artists else 'Unknown Artist')
else:
track_artist = deps.clean_beatport_text(str(track_artists))
logger.debug(f"Searching {discovery_source.upper()} for: '{track_artist}' - '{track_title}'")
# Check discovery cache first
cache_key = deps.get_discovery_cache_key(track_title, track_artist)
try:
cache_db = deps.get_database()
cached_match = cache_db.get_discovery_cache_match(cache_key[0], cache_key[1], discovery_source)
if cached_match and deps.validate_discovery_cache_artist(track_artist, cached_match):
logger.debug(f"CACHE HIT [{i+1}/{len(tracks)}]: {track_artist} - {track_title}")
# Convert artists from ['str'] to [{'name': 'str'}] for Beatport frontend format
beatport_artists = cached_match.get('artists', [])
if beatport_artists and isinstance(beatport_artists[0], str):
cached_match['artists'] = [{'name': a} for a in beatport_artists]
result_entry = {
'index': i,
'beatport_track': {
'title': track_title,
'artist': track_artist
},
'status': 'found',
'status_class': 'found',
'discovery_source': discovery_source,
'spotify_data': cached_match
}
state['spotify_matches'] += 1
state['discovery_results'].append(result_entry)
continue
except Exception as cache_err:
logger.error(f"Cache lookup error: {cache_err}")
# Use matching engine for track matching
found_track = None
best_confidence = 0.0
best_raw_track = None
min_confidence = 0.9 # Higher threshold for Beatport to avoid bad matches
# Generate search queries using matching engine (with fallback)
try:
temp_track = type('TempTrack', (), {
'name': track_title,
'artists': [track_artist],
'album': None
})()
search_queries = deps.matching_engine.generate_download_queries(temp_track)
logger.debug(f"Generated {len(search_queries)} search queries using matching engine")
except Exception as e:
logger.error(f"Matching engine failed for Beatport, falling back to basic queries: {e}")
if use_spotify:
search_queries = [
f"{track_artist} {track_title}",
f'artist:"{track_artist}" track:"{track_title}"',
f'"{track_artist}" "{track_title}"'
]
else:
search_queries = [
f"{track_artist} {track_title}",
f"{track_title} {track_artist}",
track_title
]
for query_idx, search_query in enumerate(search_queries):
try:
logger.debug(f"Query {query_idx + 1}/{len(search_queries)}: {search_query} ({discovery_source.upper()})")
search_results = None
if use_spotify and not deps.spotify_rate_limited():
search_results = deps.spotify_client.search_tracks(search_query, limit=10)
else:
search_results = itunes_client_instance.search_tracks(search_query, limit=10)
if not search_results:
continue
# Score all results using the matching engine
match, confidence, match_idx = deps.discovery_score_candidates(
track_title, track_artist, 0, search_results
)
if match and confidence > best_confidence and confidence >= min_confidence:
best_confidence = confidence
found_track = match
if use_spotify and match.id:
_cache = deps.get_metadata_cache()
best_raw_track = _cache.get_entity('spotify', 'track', match.id)
else:
best_raw_track = None
logger.debug(f"New best Beatport match: {match.artists[0]} - {match.name} (confidence: {confidence:.3f})")
if best_confidence >= 0.9:
logger.debug(f"High confidence match found ({best_confidence:.3f}), stopping search")
break
except Exception as e:
logger.debug(f"Error in {discovery_source.upper()} search for query '{search_query}': {e}")
continue
# Strategy 4: Extended search with higher limit (last resort)
if not found_track:
logger.debug("Beatport Strategy 4: Extended search with limit=50")
query = f"{track_artist} {track_title}"
if use_spotify:
extended_results = deps.spotify_client.search_tracks(query, limit=50)
else:
extended_results = itunes_client_instance.search_tracks(query, limit=50)
if extended_results:
match, confidence, _ = deps.discovery_score_candidates(
track_title, track_artist, 0, extended_results
)
if match and confidence >= min_confidence:
found_track = match
best_confidence = confidence
logger.debug(f"Strategy 4 Beatport match (extended): {match.artists[0]} - {match.name} (confidence: {confidence:.3f})")
if found_track:
logger.info(f"Final Beatport match: {found_track.artists[0]} - {found_track.name} (confidence: {best_confidence:.3f})")
else:
logger.warning(f"No suitable match found (best confidence was {best_confidence:.3f}, required {min_confidence:.3f})")
# Create result entry
result_entry = {
'index': i, # Add index for frontend table row identification
'beatport_track': {
'title': track_title,
'artist': track_artist
},
'status': 'found' if found_track else 'not_found',
'status_class': 'found' if found_track else 'not-found',
'discovery_source': discovery_source,
'confidence': best_confidence
}
if found_track:
if use_spotify:
# SPOTIFY result formatting
# Debug: show available attributes
logger.debug(f"Spotify track attributes: {dir(found_track)}")
# Format artists correctly for frontend compatibility
formatted_artists = []
if isinstance(found_track.artists, list):
# If it's already a list of strings, convert to objects with 'name' property
for artist in found_track.artists:
if isinstance(artist, str):
formatted_artists.append({'name': artist})
else:
# If it's already an object, use as-is
formatted_artists.append(artist)
else:
# Single artist case
formatted_artists = [{'name': str(found_track.artists)}]
# Use full album object from raw Spotify data if available
album_data = best_raw_track.get('album', {}) if best_raw_track else {}
if not album_data:
# Fallback to string album name
album_data = {'name': found_track.album, 'album_type': 'album', 'release_date': getattr(found_track, 'release_date', '') or '', 'images': []}
result_entry['spotify_data'] = {
'name': found_track.name,
'artists': formatted_artists, # Now formatted as list of objects with 'name' property
'album': album_data, # Full album object with images
'id': found_track.id,
'source': 'spotify'
}
else:
# ITUNES result formatting
# Note: iTunes Track dataclass has 'artists' (list) and 'image_url', not 'artist' and 'artwork_url'
result_artists = found_track.artists if hasattr(found_track, 'artists') else []
result_artist = result_artists[0] if result_artists else 'Unknown'
result_name = found_track.name if hasattr(found_track, 'name') else 'Unknown'
album_name = found_track.album if hasattr(found_track, 'album') else 'Unknown Album'
image_url = found_track.image_url if hasattr(found_track, 'image_url') else ''
track_id = found_track.id if hasattr(found_track, 'id') else ''
# Format artists as list of objects for frontend compatibility
formatted_artists = [{'name': result_artist}]
# Build album data with artwork
album_data = {
'name': album_name,
'album_type': 'album',
'release_date': getattr(found_track, 'release_date', '') or '',
'images': [{'url': image_url, 'height': 300, 'width': 300}] if image_url else []
}
result_entry['spotify_data'] = { # Use same key for frontend compatibility
'name': result_name,
'artists': formatted_artists,
'album': album_data,
'id': track_id,
'source': discovery_source
}
state['spotify_matches'] += 1
# Save to discovery cache (normalize artists from [{name:str}] to [str] for canonical format)
if best_confidence >= 0.75:
try:
cache_data = dict(result_entry['spotify_data'])
cache_artists = cache_data.get('artists', [])
if cache_artists and isinstance(cache_artists[0], dict):
cache_data['artists'] = [a.get('name', '') for a in cache_artists]
# Extract image URL for discovery pool display
if 'image_url' not in cache_data:
_bp_album = cache_data.get('album', {})
_bp_images = _bp_album.get('images', []) if isinstance(_bp_album, dict) else []
cache_data['image_url'] = _bp_images[0].get('url', '') if _bp_images else ''
cache_db = deps.get_database()
cache_db.save_discovery_cache_match(
cache_key[0], cache_key[1], discovery_source, best_confidence,
cache_data, track_title, track_artist
)
logger.debug(f"CACHE SAVED: {track_artist} - {track_title} (confidence: {best_confidence:.3f})")
except Exception as cache_err:
logger.error(f"Cache save error: {cache_err}")
# Auto Wing It fallback for unmatched tracks
if result_entry.get('status_class') == 'not-found':
bp_t = result_entry.get('beatport_track', {})
stub = deps.build_discovery_wing_it_stub(
bp_t.get('title', ''),
bp_t.get('artist', ''),
)
result_entry['status'] = 'found'
result_entry['status_class'] = 'wing-it'
result_entry['spotify_data'] = stub
result_entry['match_data'] = stub
result_entry['wing_it_fallback'] = True
result_entry['confidence'] = 0
state['spotify_matches'] = state.get('spotify_matches', 0) + 1
state['wing_it_count'] = state.get('wing_it_count', 0) + 1
state['discovery_results'].append(result_entry)
# Small delay to avoid rate limiting
time.sleep(0.1)
except Exception as e:
logger.error(f"Error processing Beatport track {i}: {e}")
# Add error result
state['discovery_results'].append({
'index': i, # Add index for frontend table row identification
'beatport_track': {
'title': track.get('name', 'Unknown'), # Changed from 'title' to 'name' to match track structure
'artist': track.get('artists', ['Unknown'])[0] if isinstance(track.get('artists'), list) else 'Unknown'
},
'status': 'error',
'status_class': 'error', # Add status class for CSS styling
'error': str(e),
'discovery_source': discovery_source
})
# Mark discovery as complete
state['discovery_progress'] = 100
state['phase'] = 'discovered'
state['status'] = 'discovered'
# Add activity for completion
chart_name = chart.get('name', 'Unknown Chart')
source_label = discovery_source.upper()
deps.add_activity_item("", f"Beatport Discovery Complete ({source_label})",
f"'{chart_name}' - {state['spotify_matches']}/{len(tracks)} tracks found", "Now")
logger.info(f"Beatport discovery complete ({source_label}): {state['spotify_matches']}/{len(tracks)} tracks found")
# Sync discovery results back to mirrored playlist
deps.sync_discovery_results_to_mirrored('beatport', url_hash, state.get('discovery_results', []), discovery_source, profile_id=state.get('_profile_id', 1))
except Exception as e:
logger.error(f"Error in Beatport discovery worker: {e}")
if url_hash in deps.beatport_chart_states:
deps.beatport_chart_states[url_hash]['status'] = 'error'
deps.beatport_chart_states[url_hash]['phase'] = 'fresh'
finally:
deps.resume_enrichment_workers(_ew_state, 'Beatport discovery')

@ -0,0 +1,356 @@
"""Tests for core/discovery/beatport.py — Beatport chart discovery worker."""
from __future__ import annotations
from dataclasses import dataclass
import pytest
from core.discovery import beatport as db
# ---------------------------------------------------------------------------
# Fakes
# ---------------------------------------------------------------------------
@dataclass
class _FakeMatch:
id: str = 'spt-1'
name: str = 'Found'
artists: list = None
album: str = 'Found Album'
duration_ms: int = 200000
image_url: str = ''
release_date: str = '2024-01-01'
def __post_init__(self):
if self.artists is None:
self.artists = ['Found Artist']
class _FakeSpotifyClient:
def __init__(self, results=None, authenticated=True):
self._results = results if results is not None else []
self._authenticated = authenticated
self.search_calls = []
def is_spotify_authenticated(self):
return self._authenticated
def search_tracks(self, query, limit=10):
self.search_calls.append((query, limit))
return self._results
class _FakeITunesClient:
def __init__(self, results=None):
self._results = results if results is not None else []
self.search_calls = []
def search_tracks(self, query, limit=10):
self.search_calls.append((query, limit))
return self._results
class _FakeMatchingEngine:
def generate_download_queries(self, t):
return [f"{t.artists[0]} {t.name}"]
class _FakeDB:
def __init__(self, cache_match=None):
self._cache_match = cache_match
self.cache_saves = []
def get_discovery_cache_match(self, t, a, src):
return self._cache_match
def save_discovery_cache_match(self, t, a, src, conf, data, raw_t, raw_a):
self.cache_saves.append((t, a, src, conf))
class _FakeMetadataCache:
def get_entity(self, source, kind, entity_id):
return None
def _build_deps(
*,
states=None,
spotify_results=None,
spotify_auth=True,
itunes_results=None,
discovery_source='spotify',
cache_match=None,
rate_limited=False,
score_result=(None, 0.0, 0),
activity_log=None,
sync_calls=None,
):
activity_log = activity_log if activity_log is not None else []
sync_calls = sync_calls if sync_calls is not None else []
db_inst = _FakeDB(cache_match=cache_match)
spotify = _FakeSpotifyClient(results=spotify_results or [], authenticated=spotify_auth)
itunes = _FakeITunesClient(results=itunes_results or [])
deps = db.BeatportDiscoveryDeps(
beatport_chart_states=states if states is not None else {},
spotify_client=spotify,
matching_engine=_FakeMatchingEngine(),
pause_enrichment_workers=lambda label: {'paused': True},
resume_enrichment_workers=lambda state, label: None,
get_active_discovery_source=lambda: discovery_source,
get_metadata_fallback_client=lambda: itunes,
clean_beatport_text=lambda s: (s or '').strip(),
get_discovery_cache_key=lambda title, artist: (title.lower(), artist.lower()),
get_database=lambda: db_inst,
validate_discovery_cache_artist=lambda artist, m: True,
spotify_rate_limited=lambda: rate_limited,
discovery_score_candidates=lambda *args, **kw: score_result,
get_metadata_cache=lambda: _FakeMetadataCache(),
build_discovery_wing_it_stub=lambda title, artist: {
'name': title, 'artists': [artist], 'wing_it': True
},
add_activity_item=lambda *a, **kw: activity_log.append((a, kw)),
sync_discovery_results_to_mirrored=lambda *a, **kw: sync_calls.append((a, kw)),
)
deps._db = db_inst
deps._spotify = spotify
deps._itunes = itunes
deps._activity_log = activity_log
deps._sync_calls = sync_calls
return deps
def _seed_state(url_hash, states, *, tracks=None, phase='discovering'):
states[url_hash] = {
'chart': {'name': 'Top 100', 'tracks': tracks or []},
'phase': phase,
'spotify_matches': 0,
'discovery_results': [],
'discovery_progress': 0,
}
def _track(name='Track', artists=None):
return {'name': name, 'artists': artists or ['Artist']}
# ---------------------------------------------------------------------------
# Cache hit
# ---------------------------------------------------------------------------
def test_cache_hit_short_circuits():
"""Cache hit appends Found result, normalizes artists ['str'] → [{'name'}]."""
states = {}
cached = {'name': 'Cached', 'artists': ['CA']}
_seed_state('h1', states, tracks=[_track()])
deps = _build_deps(states=states, cache_match=cached)
db.run_beatport_discovery_worker('h1', deps)
state = states['h1']
assert state['spotify_matches'] == 1
result = state['discovery_results'][0]
assert result['status'] == 'found'
assert deps._spotify.search_calls == [] # no live search
# artists normalized in the cached_match passed by reference
assert cached['artists'] == [{'name': 'CA'}]
# ---------------------------------------------------------------------------
# Spotify match path
# ---------------------------------------------------------------------------
def test_spotify_match_formats_artists_as_objects():
"""Spotify match → artists formatted as [{'name': str}] for frontend."""
states = {}
match = _FakeMatch(artists=['A1', 'A2'])
_seed_state('h2', states, tracks=[_track()])
deps = _build_deps(states=states, spotify_results=[match],
score_result=(match, 0.95, 0))
db.run_beatport_discovery_worker('h2', deps)
result = states['h2']['discovery_results'][0]
assert result['status'] == 'found'
assert result['confidence'] == 0.95
assert result['spotify_data']['artists'] == [{'name': 'A1'}, {'name': 'A2'}]
assert deps._db.cache_saves # cached
def test_spotify_artists_string_format_normalized():
"""Spotify result with single string artist → list-of-objects format."""
states = {}
match = _FakeMatch(artists='SoloArtist') # single string
_seed_state('h3', states, tracks=[_track()])
deps = _build_deps(states=states, spotify_results=[match],
score_result=(match, 0.95, 0))
db.run_beatport_discovery_worker('h3', deps)
result = states['h3']['discovery_results'][0]
assert result['spotify_data']['artists'] == [{'name': 'SoloArtist'}]
# ---------------------------------------------------------------------------
# iTunes match path
# ---------------------------------------------------------------------------
def test_itunes_match_includes_image_url():
"""iTunes match → album.images includes image_url object."""
states = {}
match = _FakeMatch(image_url='http://it', name='iName', artists=['iA'])
_seed_state('h4', states, tracks=[_track()])
deps = _build_deps(
states=states, spotify_auth=False, discovery_source='itunes',
itunes_results=[match], score_result=(match, 0.95, 0),
)
db.run_beatport_discovery_worker('h4', deps)
result = states['h4']['discovery_results'][0]
assert result['spotify_data']['source'] == 'itunes'
images = result['spotify_data']['album']['images']
assert images[0]['url'] == 'http://it'
# ---------------------------------------------------------------------------
# Wing It fallback
# ---------------------------------------------------------------------------
def test_no_match_falls_back_to_wing_it():
"""No high-confidence match → Wing It stub stored."""
states = {}
_seed_state('h5', states, tracks=[_track()])
deps = _build_deps(states=states, score_result=(None, 0.0, 0))
db.run_beatport_discovery_worker('h5', deps)
state = states['h5']
assert state.get('wing_it_count') == 1
result = state['discovery_results'][0]
assert result['wing_it_fallback'] is True
assert result['status_class'] == 'wing-it'
# ---------------------------------------------------------------------------
# Cancellation
# ---------------------------------------------------------------------------
def test_phase_changed_aborts():
"""state['phase'] != 'discovering' aborts immediately."""
states = {}
_seed_state('h6', states, tracks=[_track(), _track('T2')], phase='cancelled')
deps = _build_deps(states=states)
db.run_beatport_discovery_worker('h6', deps)
assert states['h6']['discovery_results'] == []
# ---------------------------------------------------------------------------
# Completion
# ---------------------------------------------------------------------------
def test_completion_marks_discovered():
"""Completion → phase='discovered', status='discovered', progress=100."""
states = {}
_seed_state('h7', states, tracks=[_track()])
deps = _build_deps(states=states)
db.run_beatport_discovery_worker('h7', deps)
assert states['h7']['phase'] == 'discovered'
assert states['h7']['status'] == 'discovered'
assert states['h7']['discovery_progress'] == 100
def test_activity_feed_logged():
"""Completion appends activity feed entry mentioning Beatport Discovery Complete."""
states = {}
_seed_state('h8', states, tracks=[_track()])
deps = _build_deps(states=states)
db.run_beatport_discovery_worker('h8', deps)
args, _ = deps._activity_log[0]
title = args[1]
assert 'Beatport Discovery Complete' in title
# ---------------------------------------------------------------------------
# Mirrored sync
# ---------------------------------------------------------------------------
def test_sync_to_mirrored_invoked():
"""Completion calls sync_discovery_results_to_mirrored with 'beatport' tag."""
states = {}
_seed_state('h9', states, tracks=[_track()])
states['h9']['_profile_id'] = 3
deps = _build_deps(states=states)
db.run_beatport_discovery_worker('h9', deps)
assert len(deps._sync_calls) == 1
args, kwargs = deps._sync_calls[0]
assert args[0] == 'beatport'
assert args[1] == 'h9'
assert kwargs.get('profile_id') == 3
# ---------------------------------------------------------------------------
# Error handling
# ---------------------------------------------------------------------------
def test_top_level_error_marks_state():
"""Exception in main try (state lookup ok) → phase='fresh', status='error'."""
states = {}
_seed_state('herr', states, tracks=[_track()])
deps = _build_deps(states=states)
deps.get_active_discovery_source = lambda: (_ for _ in ()).throw(RuntimeError("boom"))
db.run_beatport_discovery_worker('herr', deps)
assert states['herr']['phase'] == 'fresh'
assert states['herr']['status'] == 'error'
def test_per_track_error_appends_error_entry():
"""Per-track exception → 'error' result entry appended, loop continues."""
states = {}
_seed_state('h10', states, tracks=[_track('A'), _track('B')])
deps = _build_deps(states=states)
call_count = [0]
def raising_score(*args, **kw):
call_count[0] += 1
if call_count[0] == 1:
raise RuntimeError("score boom")
return (None, 0.0, 0)
deps.discovery_score_candidates = raising_score
db.run_beatport_discovery_worker('h10', deps)
state = states['h10']
# First track errored mid-search but loop continues; second processes Wing It path
assert len(state['discovery_results']) == 2
# ---------------------------------------------------------------------------
# Artist normalization (Beatport "CID,Taylr Renee" comma-split)
# ---------------------------------------------------------------------------
def test_comma_separated_artists_split_to_first():
"""Beatport 'CID,Taylr Renee' single-string-artists → first artist used."""
states = {}
track = _track(name='ABC', artists=['CID,Taylr Renee'])
_seed_state('h11', states, tracks=[track])
deps = _build_deps(states=states)
db.run_beatport_discovery_worker('h11', deps)
result = states['h11']['discovery_results'][0]
assert result['beatport_track']['artist'] == 'CID'

@ -38366,329 +38366,38 @@ def clean_beatport_text(text):
return text
def _run_beatport_discovery_worker(url_hash):
"""Background worker for Beatport discovery process (Spotify preferred, iTunes fallback)"""
_ew_state = {}
try:
_ew_state = _pause_enrichment_workers('Beatport discovery')
state = beatport_chart_states[url_hash]
chart = state['chart']
tracks = chart['tracks']
# Determine which provider to use
discovery_source = _get_active_discovery_source()
use_spotify = (discovery_source == 'spotify') and spotify_client and spotify_client.is_spotify_authenticated()
# Initialize fallback client if needed
itunes_client_instance = None
if not use_spotify:
itunes_client_instance = _get_metadata_fallback_client()
logger.info(f"Starting {discovery_source.upper()} discovery for {len(tracks)} Beatport tracks...")
# Store discovery source in state for frontend
state['discovery_source'] = discovery_source
# Process each track for discovery
for i, track in enumerate(tracks):
try:
# Check for cancellation
if state.get('phase') != 'discovering':
logger.warning(f"Beatport discovery cancelled (phase changed to '{state.get('phase')}')")
return
# Update progress
state['discovery_progress'] = int((i / len(tracks)) * 100)
# Get track info from Beatport data (frontend sends 'name' and 'artists' fields)
track_title = clean_beatport_text(track.get('name', 'Unknown Title'))
track_artists = track.get('artists', ['Unknown Artist'])
# Handle artists - could be a list or string
if isinstance(track_artists, list):
if len(track_artists) > 0 and isinstance(track_artists[0], str):
# Handle case like ["CID,Taylr Renee"] - split on comma and clean
track_artist = clean_beatport_text(track_artists[0].split(',')[0].strip())
else:
track_artist = clean_beatport_text(track_artists[0] if track_artists else 'Unknown Artist')
else:
track_artist = clean_beatport_text(str(track_artists))
logger.debug(f"Searching {discovery_source.upper()} for: '{track_artist}' - '{track_title}'")
# Check discovery cache first
cache_key = _get_discovery_cache_key(track_title, track_artist)
try:
cache_db = get_database()
cached_match = cache_db.get_discovery_cache_match(cache_key[0], cache_key[1], discovery_source)
if cached_match and _validate_discovery_cache_artist(track_artist, cached_match):
logger.debug(f"CACHE HIT [{i+1}/{len(tracks)}]: {track_artist} - {track_title}")
# Convert artists from ['str'] to [{'name': 'str'}] for Beatport frontend format
beatport_artists = cached_match.get('artists', [])
if beatport_artists and isinstance(beatport_artists[0], str):
cached_match['artists'] = [{'name': a} for a in beatport_artists]
result_entry = {
'index': i,
'beatport_track': {
'title': track_title,
'artist': track_artist
},
'status': 'found',
'status_class': 'found',
'discovery_source': discovery_source,
'spotify_data': cached_match
}
state['spotify_matches'] += 1
state['discovery_results'].append(result_entry)
continue
except Exception as cache_err:
logger.error(f"Cache lookup error: {cache_err}")
# Use matching engine for track matching
found_track = None
best_confidence = 0.0
best_raw_track = None
min_confidence = 0.9 # Higher threshold for Beatport to avoid bad matches
# Generate search queries using matching engine (with fallback)
try:
temp_track = type('TempTrack', (), {
'name': track_title,
'artists': [track_artist],
'album': None
})()
search_queries = matching_engine.generate_download_queries(temp_track)
logger.debug(f"Generated {len(search_queries)} search queries using matching engine")
except Exception as e:
logger.error(f"Matching engine failed for Beatport, falling back to basic queries: {e}")
if use_spotify:
search_queries = [
f"{track_artist} {track_title}",
f'artist:"{track_artist}" track:"{track_title}"',
f'"{track_artist}" "{track_title}"'
]
else:
search_queries = [
f"{track_artist} {track_title}",
f"{track_title} {track_artist}",
track_title
]
for query_idx, search_query in enumerate(search_queries):
try:
logger.debug(f"Query {query_idx + 1}/{len(search_queries)}: {search_query} ({discovery_source.upper()})")
search_results = None
if use_spotify and not _spotify_rate_limited():
search_results = spotify_client.search_tracks(search_query, limit=10)
else:
search_results = itunes_client_instance.search_tracks(search_query, limit=10)
if not search_results:
continue
# Score all results using the matching engine
match, confidence, match_idx = _discovery_score_candidates(
track_title, track_artist, 0, search_results
)
if match and confidence > best_confidence and confidence >= min_confidence:
best_confidence = confidence
found_track = match
if use_spotify and match.id:
_cache = get_metadata_cache()
best_raw_track = _cache.get_entity('spotify', 'track', match.id)
else:
best_raw_track = None
logger.debug(f"New best Beatport match: {match.artists[0]} - {match.name} (confidence: {confidence:.3f})")
if best_confidence >= 0.9:
logger.debug(f"High confidence match found ({best_confidence:.3f}), stopping search")
break
except Exception as e:
logger.debug(f"Error in {discovery_source.upper()} search for query '{search_query}': {e}")
continue
# Strategy 4: Extended search with higher limit (last resort)
if not found_track:
logger.debug("Beatport Strategy 4: Extended search with limit=50")
query = f"{track_artist} {track_title}"
if use_spotify:
extended_results = spotify_client.search_tracks(query, limit=50)
else:
extended_results = itunes_client_instance.search_tracks(query, limit=50)
if extended_results:
match, confidence, _ = _discovery_score_candidates(
track_title, track_artist, 0, extended_results
)
if match and confidence >= min_confidence:
found_track = match
best_confidence = confidence
logger.debug(f"Strategy 4 Beatport match (extended): {match.artists[0]} - {match.name} (confidence: {confidence:.3f})")
# Beatport discovery worker logic lives in core/discovery/beatport.py.
from core.discovery import beatport as _discovery_beatport
if found_track:
logger.info(f"Final Beatport match: {found_track.artists[0]} - {found_track.name} (confidence: {best_confidence:.3f})")
else:
logger.warning(f"No suitable match found (best confidence was {best_confidence:.3f}, required {min_confidence:.3f})")
# Create result entry
result_entry = {
'index': i, # Add index for frontend table row identification
'beatport_track': {
'title': track_title,
'artist': track_artist
},
'status': 'found' if found_track else 'not_found',
'status_class': 'found' if found_track else 'not-found',
'discovery_source': discovery_source,
'confidence': best_confidence
}
if found_track:
if use_spotify:
# SPOTIFY result formatting
# Debug: show available attributes
logger.debug(f"Spotify track attributes: {dir(found_track)}")
# Format artists correctly for frontend compatibility
formatted_artists = []
if isinstance(found_track.artists, list):
# If it's already a list of strings, convert to objects with 'name' property
for artist in found_track.artists:
if isinstance(artist, str):
formatted_artists.append({'name': artist})
else:
# If it's already an object, use as-is
formatted_artists.append(artist)
else:
# Single artist case
formatted_artists = [{'name': str(found_track.artists)}]
# Use full album object from raw Spotify data if available
album_data = best_raw_track.get('album', {}) if best_raw_track else {}
if not album_data:
# Fallback to string album name
album_data = {'name': found_track.album, 'album_type': 'album', 'release_date': getattr(found_track, 'release_date', '') or '', 'images': []}
result_entry['spotify_data'] = {
'name': found_track.name,
'artists': formatted_artists, # Now formatted as list of objects with 'name' property
'album': album_data, # Full album object with images
'id': found_track.id,
'source': 'spotify'
}
else:
# ITUNES result formatting
# Note: iTunes Track dataclass has 'artists' (list) and 'image_url', not 'artist' and 'artwork_url'
result_artists = found_track.artists if hasattr(found_track, 'artists') else []
result_artist = result_artists[0] if result_artists else 'Unknown'
result_name = found_track.name if hasattr(found_track, 'name') else 'Unknown'
album_name = found_track.album if hasattr(found_track, 'album') else 'Unknown Album'
image_url = found_track.image_url if hasattr(found_track, 'image_url') else ''
track_id = found_track.id if hasattr(found_track, 'id') else ''
# Format artists as list of objects for frontend compatibility
formatted_artists = [{'name': result_artist}]
# Build album data with artwork
album_data = {
'name': album_name,
'album_type': 'album',
'release_date': getattr(found_track, 'release_date', '') or '',
'images': [{'url': image_url, 'height': 300, 'width': 300}] if image_url else []
}
result_entry['spotify_data'] = { # Use same key for frontend compatibility
'name': result_name,
'artists': formatted_artists,
'album': album_data,
'id': track_id,
'source': discovery_source
}
state['spotify_matches'] += 1
# Save to discovery cache (normalize artists from [{name:str}] to [str] for canonical format)
if best_confidence >= 0.75:
try:
cache_data = dict(result_entry['spotify_data'])
cache_artists = cache_data.get('artists', [])
if cache_artists and isinstance(cache_artists[0], dict):
cache_data['artists'] = [a.get('name', '') for a in cache_artists]
# Extract image URL for discovery pool display
if 'image_url' not in cache_data:
_bp_album = cache_data.get('album', {})
_bp_images = _bp_album.get('images', []) if isinstance(_bp_album, dict) else []
cache_data['image_url'] = _bp_images[0].get('url', '') if _bp_images else ''
cache_db = get_database()
cache_db.save_discovery_cache_match(
cache_key[0], cache_key[1], discovery_source, best_confidence,
cache_data, track_title, track_artist
)
logger.debug(f"CACHE SAVED: {track_artist} - {track_title} (confidence: {best_confidence:.3f})")
except Exception as cache_err:
logger.error(f"Cache save error: {cache_err}")
# Auto Wing It fallback for unmatched tracks
if result_entry.get('status_class') == 'not-found':
bp_t = result_entry.get('beatport_track', {})
stub = _build_discovery_wing_it_stub(
bp_t.get('title', ''),
bp_t.get('artist', ''),
)
result_entry['status'] = 'found'
result_entry['status_class'] = 'wing-it'
result_entry['spotify_data'] = stub
result_entry['match_data'] = stub
result_entry['wing_it_fallback'] = True
result_entry['confidence'] = 0
state['spotify_matches'] = state.get('spotify_matches', 0) + 1
state['wing_it_count'] = state.get('wing_it_count', 0) + 1
state['discovery_results'].append(result_entry)
# Small delay to avoid rate limiting
time.sleep(0.1)
except Exception as e:
logger.error(f"Error processing Beatport track {i}: {e}")
# Add error result
state['discovery_results'].append({
'index': i, # Add index for frontend table row identification
'beatport_track': {
'title': track.get('name', 'Unknown'), # Changed from 'title' to 'name' to match track structure
'artist': track.get('artists', ['Unknown'])[0] if isinstance(track.get('artists'), list) else 'Unknown'
},
'status': 'error',
'status_class': 'error', # Add status class for CSS styling
'error': str(e),
'discovery_source': discovery_source
})
# Mark discovery as complete
state['discovery_progress'] = 100
state['phase'] = 'discovered'
state['status'] = 'discovered'
# Add activity for completion
chart_name = chart.get('name', 'Unknown Chart')
source_label = discovery_source.upper()
add_activity_item("", f"Beatport Discovery Complete ({source_label})",
f"'{chart_name}' - {state['spotify_matches']}/{len(tracks)} tracks found", "Now")
def _build_beatport_discovery_deps():
"""Build the BeatportDiscoveryDeps bundle from web_server.py globals on each call."""
return _discovery_beatport.BeatportDiscoveryDeps(
beatport_chart_states=beatport_chart_states,
spotify_client=spotify_client,
matching_engine=matching_engine,
pause_enrichment_workers=_pause_enrichment_workers,
resume_enrichment_workers=_resume_enrichment_workers,
get_active_discovery_source=_get_active_discovery_source,
get_metadata_fallback_client=_get_metadata_fallback_client,
clean_beatport_text=clean_beatport_text,
get_discovery_cache_key=_get_discovery_cache_key,
get_database=get_database,
validate_discovery_cache_artist=_validate_discovery_cache_artist,
spotify_rate_limited=_spotify_rate_limited,
discovery_score_candidates=_discovery_score_candidates,
get_metadata_cache=get_metadata_cache,
build_discovery_wing_it_stub=_build_discovery_wing_it_stub,
add_activity_item=add_activity_item,
sync_discovery_results_to_mirrored=_sync_discovery_results_to_mirrored,
)
logger.info(f"Beatport discovery complete ({source_label}): {state['spotify_matches']}/{len(tracks)} tracks found")
# Sync discovery results back to mirrored playlist
_sync_discovery_results_to_mirrored('beatport', url_hash, state.get('discovery_results', []), discovery_source, profile_id=state.get('_profile_id', 1))
def _run_beatport_discovery_worker(url_hash):
return _discovery_beatport.run_beatport_discovery_worker(
url_hash, _build_beatport_discovery_deps()
)
except Exception as e:
logger.error(f"Error in Beatport discovery worker: {e}")
if url_hash in beatport_chart_states:
beatport_chart_states[url_hash]['status'] = 'error'
beatport_chart_states[url_hash]['phase'] = 'fresh'
finally:
_resume_enrichment_workers(_ew_state, 'Beatport discovery')
@app.route('/api/beatport/sync/start/<url_hash>', methods=['POST'])
def start_beatport_sync(url_hash):

Loading…
Cancel
Save