You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
SoulSync/core/discovery/quality_scanner.py

663 lines
31 KiB

"""Background worker for the library quality scanner.
`run_quality_scanner(scope, profile_id, deps)` is the function the
quality-scanner endpoint kicks off in a thread to scan the library
for low-quality tracks (below the user's configured quality profile)
and add provider matches to the wishlist:
1. Reset scanner state, load quality profile + minimum acceptable tier.
2. Load tracks from DB based on scope:
- 'watchlist' → tracks for watchlisted artists only.
- other → all library tracks.
3. For each track:
- Stop-request gate (state['status'] != 'running').
- Quality-tier check via _get_quality_tier_from_extension(file_path).
- Skip tracks meeting standards (tier_num <= min_acceptable_tier).
- For low-quality tracks: matching_engine search query gen, score
candidates against the configured metadata source priority
(artist + title similarity, album-type bonus), pick best match >=
0.7 confidence.
- On match: add normalized track data to wishlist via
`wishlist_service.add_track_to_wishlist` with
source_type='quality_scanner' and a source_context that captures
original file_path, format tier, bitrate, and match confidence.
4. After all tracks: status='finished', progress=100, activity feed
entry, emit `quality_scan_completed` event for automation engine.
5. On critical exception: status='error', error message captured.
"""
from __future__ import annotations
import logging
import time
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Callable, Dict, Optional
from core.metadata.registry import get_client_for_source, get_primary_source, get_source_priority
from core.metadata.types import Album
from core.wishlist.payloads import ensure_wishlist_track_format
logger = logging.getLogger(__name__)
# Per-source typed converter dispatch — same registry pattern as
# the metadata builders. Quality-scanner result normalization routes
# the embedded ``track.album`` blob through Album.from_<source>_dict()
# when provider is known. Falls back to legacy duck-typed extraction.
_TYPED_ALBUM_CONVERTERS: Dict[str, Callable[[Dict[str, Any]], Album]] = {
'spotify': Album.from_spotify_dict,
'itunes': Album.from_itunes_dict,
'deezer': Album.from_deezer_dict,
'discogs': Album.from_discogs_dict,
'musicbrainz': Album.from_musicbrainz_dict,
'hydrabase': Album.from_hydrabase_dict,
'qobuz': Album.from_qobuz_dict,
}
@dataclass
class QualityScannerDeps:
"""Bundle of cross-cutting deps the quality scanner needs."""
quality_scanner_state: dict
quality_scanner_lock: Any # threading.Lock
QUALITY_TIERS: dict
matching_engine: Any
automation_engine: Any
get_quality_tier_from_extension: Callable
add_activity_item: Callable
def _extract_lookup_value(value: Any, *names: str, default: Any = None) -> Any:
if value is None:
return default
if isinstance(value, (str, bytes)):
return value
for name in names:
if isinstance(value, dict):
if name in value and value[name] is not None:
return value[name]
else:
candidate = getattr(value, name, None)
if candidate is not None:
return candidate
return default
def _normalize_track_artists(track_item: Any) -> list[dict]:
artists = _extract_lookup_value(track_item, 'artists', default=[]) or []
if isinstance(artists, (str, bytes)):
artists = [artists]
elif isinstance(artists, dict):
artists = [artists]
else:
try:
artists = list(artists)
except TypeError:
artists = [artists]
normalized = []
for artist in artists:
artist_name = _extract_lookup_value(artist, 'name', 'artist_name', 'title')
if not artist_name and isinstance(artist, (str, bytes)):
artist_name = artist
if artist_name:
artist_data = {'name': str(artist_name)}
artist_images = _normalize_image_entries(_extract_lookup_value(artist, 'images', default=[]))
artist_image_url = _extract_lookup_value(artist, 'image_url', 'artist_image_url', default=None)
if artist_image_url and not artist_images:
artist_images = [{'url': str(artist_image_url)}]
if artist_images:
artist_data['images'] = artist_images
artist_data['image_url'] = artist_images[0].get('url')
normalized.append(artist_data)
if not normalized:
normalized.append({'name': 'Unknown Artist'})
return normalized
def _normalize_image_entries(image_value: Any) -> list[dict]:
if not image_value:
return []
if isinstance(image_value, dict):
image_value = [image_value]
elif isinstance(image_value, (str, bytes)):
image_value = [image_value]
else:
try:
image_value = list(image_value)
except TypeError:
return []
normalized = []
seen_urls = set()
for image in image_value:
if isinstance(image, dict):
image_url = image.get('url') or image.get('image_url')
if not image_url:
continue
image_dict = dict(image)
image_dict['url'] = str(image_url)
elif isinstance(image, (str, bytes)):
image_dict = {'url': str(image)}
else:
continue
if image_dict['url'] in seen_urls:
continue
seen_urls.add(image_dict['url'])
normalized.append(image_dict)
return normalized
def _normalize_track_album(track_item: Any, provider: Optional[str] = None) -> dict:
"""Normalize a track's embedded album blob into a flat dict.
When ``provider`` is provided AND maps to a registered typed Album
converter, routes through the typed path to seed canonical fields
on ``album_data`` before legacy fallback chains fill any gaps.
Falls back to legacy duck-typed extraction on unknown provider /
non-dict input / typed converter error — same pattern as the
metadata builders.
"""
album = _extract_lookup_value(track_item, 'album', default={})
if isinstance(album, dict):
album_data = dict(album)
else:
album_data = {
'name': _extract_lookup_value(album, 'name', 'title', default=str(album) if album else '') or '',
'album_type': _extract_lookup_value(album, 'album_type', default='album') or 'album',
'total_tracks': _extract_lookup_value(album, 'total_tracks', 'track_count', default=0) or 0,
'release_date': _extract_lookup_value(album, 'release_date', default='') or '',
}
if provider and isinstance(album, dict):
converter = _TYPED_ALBUM_CONVERTERS.get(provider.strip().lower())
if converter is not None:
try:
typed_album = converter(album)
if typed_album.name:
album_data.setdefault('name', typed_album.name)
if typed_album.album_type:
album_data.setdefault('album_type', typed_album.album_type)
if typed_album.total_tracks:
album_data.setdefault('total_tracks', typed_album.total_tracks)
if typed_album.release_date:
album_data.setdefault('release_date', typed_album.release_date)
if typed_album.id:
album_data.setdefault('id', typed_album.id)
except Exception as exc:
logger.debug(
"Typed album converter failed for provider %s in quality "
"scanner normalize, falling back to legacy: %s", provider, exc,
)
album_data.setdefault('name', _extract_lookup_value(track_item, 'album_name', default='Unknown Album') or 'Unknown Album')
album_data.setdefault('album_type', _extract_lookup_value(track_item, 'album_type', default='album') or 'album')
album_data.setdefault('total_tracks', _extract_lookup_value(track_item, 'total_tracks', 'track_count', default=0) or 0)
album_data.setdefault('release_date', _extract_lookup_value(track_item, 'release_date', default='') or '')
album_images = _normalize_image_entries(album_data.get('images'))
if not album_images and isinstance(album, dict):
album_images = _normalize_image_entries(
album.get('images')
or album.get('image_url')
or album.get('album_cover_url')
or album.get('cover_url')
)
if not album_images:
album_images = _normalize_image_entries(
_extract_lookup_value(track_item, 'images', default=None)
or _extract_lookup_value(track_item, 'image_url', default=None)
or _extract_lookup_value(track_item, 'album_cover_url', default=None)
or _extract_lookup_value(track_item, 'cover_url', default=None)
)
if album_images:
album_data['images'] = album_images
album_data.setdefault('image_url', album_images[0].get('url'))
else:
album_data['images'] = []
album_data.setdefault('artists', _normalize_track_artists(track_item))
return album_data
def _normalize_track_match(track_item: Any, provider: str) -> dict:
track_data = {
'id': _extract_lookup_value(track_item, 'id', 'track_id', default='') or '',
'name': _extract_lookup_value(track_item, 'name', 'title', default='Unknown Track') or 'Unknown Track',
'artists': _normalize_track_artists(track_item),
'album': _normalize_track_album(track_item, provider=provider),
'image_url': _extract_lookup_value(track_item, 'image_url', 'album_cover_url', default=None),
'duration_ms': _extract_lookup_value(track_item, 'duration_ms', default=0) or 0,
'track_number': _extract_lookup_value(track_item, 'track_number', default=1) or 1,
'disc_number': _extract_lookup_value(track_item, 'disc_number', default=1) or 1,
'preview_url': _extract_lookup_value(track_item, 'preview_url', default=None),
'external_urls': _extract_lookup_value(track_item, 'external_urls', default={}) or {},
'popularity': _extract_lookup_value(track_item, 'popularity', default=0) or 0,
'provider': provider,
'source': provider,
}
if not track_data['image_url']:
album_images = track_data['album'].get('images') if isinstance(track_data['album'], dict) else []
if isinstance(album_images, list) and album_images:
first_image = album_images[0]
if isinstance(first_image, dict):
track_data['image_url'] = first_image.get('url')
return ensure_wishlist_track_format(track_data)
def _track_name(track_item: Any) -> str:
return str(_extract_lookup_value(track_item, 'name', 'title', default='Unknown Track') or 'Unknown Track')
def _track_artist_names(track_item: Any) -> list[str]:
artists = _extract_lookup_value(track_item, 'artists', default=[]) or []
if isinstance(artists, (str, bytes)):
artists = [artists]
elif isinstance(artists, dict):
artists = [artists]
else:
try:
artists = list(artists)
except TypeError:
artists = [artists]
normalized = []
for artist in artists:
artist_name = _extract_lookup_value(artist, 'name', 'artist_name', 'title')
if not artist_name and isinstance(artist, (str, bytes)):
artist_name = artist
if artist_name:
normalized.append(str(artist_name))
return normalized
def _search_tracks_for_source(source: str, query: str, limit: int = 5, client: Any = None):
if client is None:
client = get_client_for_source(source)
if not client or not hasattr(client, 'search_tracks'):
return []
try:
if source == 'spotify':
return client.search_tracks(query, limit=limit, allow_fallback=False) or []
return client.search_tracks(query, limit=limit) or []
except TypeError:
try:
return client.search_tracks(query, limit=limit) or []
except Exception as exc:
logger.debug("Could not search %s for %s: %s", source, query, exc)
return []
except Exception as exc:
logger.debug("Could not search %s for %s: %s", source, query, exc)
return []
def run_quality_scanner(scope='watchlist', profile_id=1, deps: QualityScannerDeps = None):
"""Main quality scanner worker function"""
from core.wishlist_service import get_wishlist_service
from database.music_database import MusicDatabase
try:
with deps.quality_scanner_lock:
deps.quality_scanner_state["status"] = "running"
deps.quality_scanner_state["phase"] = "Initializing scan..."
deps.quality_scanner_state["progress"] = 0
deps.quality_scanner_state["processed"] = 0
deps.quality_scanner_state["total"] = 0
deps.quality_scanner_state["quality_met"] = 0
deps.quality_scanner_state["low_quality"] = 0
deps.quality_scanner_state["matched"] = 0
deps.quality_scanner_state["results"] = []
deps.quality_scanner_state["error_message"] = ""
logger.info(f"[Quality Scanner] Starting scan with scope: {scope}")
# Get database instance
db = MusicDatabase()
# Get quality profile to determine preferred quality
quality_profile = db.get_quality_profile()
preferred_qualities = quality_profile.get('qualities', {})
# Determine minimum acceptable tier based on enabled qualities
min_acceptable_tier = 999
for quality_name, quality_config in preferred_qualities.items():
if quality_config.get('enabled', False):
# Map quality profile names to tier names
tier_map = {
'flac': 'lossless',
'mp3_320': 'low_lossy',
'mp3_256': 'low_lossy',
'mp3_192': 'low_lossy'
}
tier_name = tier_map.get(quality_name)
if tier_name:
tier_num = deps.QUALITY_TIERS[tier_name]['tier']
min_acceptable_tier = min(min_acceptable_tier, tier_num)
logger.info(f"[Quality Scanner] Minimum acceptable tier: {min_acceptable_tier}")
# Get tracks to scan based on scope
with deps.quality_scanner_lock:
deps.quality_scanner_state["phase"] = "Loading tracks from database..."
if scope == 'watchlist':
# Get watchlist artists
watchlist_artists = db.get_watchlist_artists(profile_id=profile_id)
if not watchlist_artists:
with deps.quality_scanner_lock:
deps.quality_scanner_state["status"] = "finished"
deps.quality_scanner_state["phase"] = "No watchlist artists found"
deps.quality_scanner_state["error_message"] = "Please add artists to watchlist first"
logger.warning("[Quality Scanner] No watchlist artists found")
return
# Get artist names from watchlist
artist_names = [artist.artist_name for artist in watchlist_artists]
logger.info(f"[Quality Scanner] Scanning {len(artist_names)} watchlist artists")
# Get all tracks for these artists by name
conn = db._get_connection()
placeholders = ','.join(['?' for _ in artist_names])
tracks_to_scan = conn.execute(
f"SELECT t.id, t.title, t.artist_id, t.album_id, t.file_path, t.bitrate, a.name as artist_name, al.title as album_title "
f"FROM tracks t "
f"JOIN artists a ON t.artist_id = a.id "
f"JOIN albums al ON t.album_id = al.id "
f"WHERE a.name IN ({placeholders}) AND t.file_path IS NOT NULL",
artist_names
).fetchall()
conn.close()
else:
# Scan all library tracks
with deps.quality_scanner_lock:
deps.quality_scanner_state["phase"] = "Loading all library tracks..."
conn = db._get_connection()
tracks_to_scan = conn.execute(
"SELECT t.id, t.title, t.artist_id, t.album_id, t.file_path, t.bitrate, a.name as artist_name, al.title as album_title "
"FROM tracks t "
"JOIN artists a ON t.artist_id = a.id "
"JOIN albums al ON t.album_id = al.id "
"WHERE t.file_path IS NOT NULL"
).fetchall()
conn.close()
total_tracks = len(tracks_to_scan)
logger.info(f"[Quality Scanner] Found {total_tracks} tracks to scan")
with deps.quality_scanner_lock:
deps.quality_scanner_state["total"] = total_tracks
deps.quality_scanner_state["phase"] = f"Scanning {total_tracks} tracks..."
source_priority = get_source_priority(get_primary_source())
if not source_priority:
with deps.quality_scanner_lock:
deps.quality_scanner_state["status"] = "error"
deps.quality_scanner_state["phase"] = "No metadata provider available"
deps.quality_scanner_state["error_message"] = "No metadata provider is available for quality scanning"
logger.info("[Quality Scanner] No metadata provider available")
return
logger.info("[Quality Scanner] Using metadata source priority: %s", source_priority)
wishlist_service = get_wishlist_service()
add_to_wishlist = getattr(wishlist_service, 'add_track_to_wishlist', None)
if add_to_wishlist is None:
add_to_wishlist = getattr(wishlist_service, 'add_spotify_track_to_wishlist', None)
if add_to_wishlist is None:
raise AttributeError("Wishlist service does not expose an add-to-wishlist method")
# Scan each track
for idx, track_row in enumerate(tracks_to_scan, 1):
# Check for stop request
if deps.quality_scanner_state.get('status') != 'running':
logger.info(f"[Quality Scanner] Stop requested, halting at track {idx}/{total_tracks}")
break
try:
track_id, title, artist_id, album_id, file_path, bitrate, artist_name, album_title = track_row
# Check quality tier
tier_name, tier_num = deps.get_quality_tier_from_extension(file_path)
# Update progress
with deps.quality_scanner_lock:
deps.quality_scanner_state["processed"] = idx
deps.quality_scanner_state["progress"] = (idx / total_tracks) * 100
deps.quality_scanner_state["phase"] = f"Scanning: {artist_name} - {title}"
# Check if meets quality standards
if tier_num <= min_acceptable_tier:
# Quality met
with deps.quality_scanner_lock:
deps.quality_scanner_state["quality_met"] += 1
continue
# Low quality track found
with deps.quality_scanner_lock:
deps.quality_scanner_state["low_quality"] += 1
logger.info(f"[Quality Scanner] Low quality: {artist_name} - {title} ({tier_name}, {file_path})")
# Attempt to match using the active metadata provider
matched = False
matched_track_data = None
best_source = None
attempted_any_provider = False
try:
# Generate search queries using matching engine
temp_track = type('TempTrack', (), {
'name': title,
'artists': [artist_name],
'album': album_title
})()
search_queries = deps.matching_engine.generate_download_queries(temp_track)
logger.info(f"[Quality Scanner] Generated {len(search_queries)} search queries for {artist_name} - {title}")
# Find best match using confidence scoring
best_match = None
best_confidence = 0.0
min_confidence = 0.7 # Match existing standard
for _query_idx, search_query in enumerate(search_queries):
try:
for source in source_priority:
client = get_client_for_source(source)
if not client or not hasattr(client, 'search_tracks'):
continue
attempted_any_provider = True
provider_matches = _search_tracks_for_source(source, search_query, limit=5, client=client)
time.sleep(0.5) # Rate limit metadata API calls
if not provider_matches:
continue
# Score each result using matching engine
for provider_track in provider_matches:
try:
# Calculate artist confidence
artist_confidence = 0.0
provider_artists = _track_artist_names(provider_track)
if provider_artists:
for result_artist in provider_artists:
artist_sim = deps.matching_engine.similarity_score(
deps.matching_engine.normalize_string(artist_name),
deps.matching_engine.normalize_string(result_artist)
)
artist_confidence = max(artist_confidence, artist_sim)
# Calculate title confidence
title_confidence = deps.matching_engine.similarity_score(
deps.matching_engine.normalize_string(title),
deps.matching_engine.normalize_string(_track_name(provider_track))
)
# Combined confidence (50% artist + 50% title)
combined_confidence = (artist_confidence * 0.5 + title_confidence * 0.5)
# Small bonus for album tracks over singles
_at = _extract_lookup_value(provider_track, 'album_type', default='') or ''
if _at == 'album':
combined_confidence += 0.02
elif _at == 'ep':
combined_confidence += 0.01
candidate_artist = provider_artists[0] if provider_artists else 'Unknown Artist'
candidate_name = _track_name(provider_track)
logger.info(
f"[Quality Scanner] Candidate ({source}): '{candidate_artist}' - "
f"'{candidate_name}' (confidence: {combined_confidence:.3f})"
)
# Update best match if this is better
if combined_confidence > best_confidence and combined_confidence >= min_confidence:
best_confidence = combined_confidence
best_match = provider_track
best_source = source
logger.info(
f"[Quality Scanner] New best match ({source}): {candidate_artist} - "
f"{candidate_name} (confidence: {combined_confidence:.3f})"
)
except Exception as e:
logger.error(f"[Quality Scanner] Error scoring result: {e}")
continue
# If we found a very high confidence match, stop searching this query
if best_confidence >= 0.9:
logger.info(f"[Quality Scanner] High confidence match found ({best_confidence:.3f}), stopping search")
break
except Exception as e:
logger.debug(f"[Quality Scanner] Error searching with query '{search_query}': {e}")
continue
if not attempted_any_provider:
with deps.quality_scanner_lock:
deps.quality_scanner_state["status"] = "error"
deps.quality_scanner_state["phase"] = "No metadata provider available"
deps.quality_scanner_state["error_message"] = "No metadata provider is available for quality scanning"
logger.info("[Quality Scanner] No metadata provider available")
return
# Process best match
if best_match:
matched = True
final_artist = _track_artist_names(best_match)[0] if _track_artist_names(best_match) else 'Unknown Artist'
final_name = _track_name(best_match)
final_source = best_source or 'metadata'
logger.info(
f"[Quality Scanner] Final match ({final_source}): {final_artist} - "
f"{final_name} (confidence: {best_confidence:.3f})"
)
# Build normalized track data for wishlist
matched_track_data = _normalize_track_match(best_match, final_source)
# Add to wishlist
source_context = {
'quality_scanner': True,
'original_file_path': file_path,
'original_format': tier_name,
'original_bitrate': bitrate,
'match_confidence': best_confidence,
'scan_date': datetime.now().isoformat()
}
success = add_to_wishlist(
track_data=matched_track_data,
failure_reason=f"Low quality - {tier_name.replace('_', ' ').title()} format",
source_type='quality_scanner',
source_context=source_context,
profile_id=profile_id
)
if success:
with deps.quality_scanner_lock:
deps.quality_scanner_state["matched"] += 1
logger.info(f"[Quality Scanner] Matched and added to wishlist: {artist_name} - {title}")
else:
logger.error(f"[Quality Scanner] Failed to add to wishlist: {artist_name} - {title}")
else:
logger.warning(
f"[Quality Scanner] No suitable metadata match found "
f"(best confidence: {best_confidence:.3f}, required: {min_confidence:.3f})"
)
except Exception as matching_error:
logger.error(f"[Quality Scanner] Matching error for {artist_name} - {title}: {matching_error}")
# Store result
result_entry = {
'track_id': track_id,
'title': title,
'artist': artist_name,
'album': album_title,
'file_path': file_path,
'current_format': tier_name,
'bitrate': bitrate,
'matched': matched,
'match_id': matched_track_data['id'] if matched_track_data else None,
'provider': best_source if matched else None,
'spotify_id': matched_track_data['id'] if matched_track_data else None,
}
with deps.quality_scanner_lock:
deps.quality_scanner_state["results"].append(result_entry)
if not matched:
logger.warning(f"[Quality Scanner] No metadata match found for: {artist_name} - {title}")
except Exception as track_error:
logger.error(f"[Quality Scanner] Error processing track: {track_error}")
continue
# Scan complete (don't overwrite if already stopped by user)
with deps.quality_scanner_lock:
was_stopped = deps.quality_scanner_state["status"] != "running"
deps.quality_scanner_state["status"] = "finished"
deps.quality_scanner_state["progress"] = 100
if not was_stopped:
deps.quality_scanner_state["phase"] = "Scan complete"
logger.info(f"[Quality Scanner] Scan {'stopped' if was_stopped else 'complete'}: {deps.quality_scanner_state['processed']} processed, "
f"{deps.quality_scanner_state['low_quality']} low quality, {deps.quality_scanner_state['matched']} matched to metadata providers")
# Add activity
deps.add_activity_item("", "Quality Scan Complete",
f"{deps.quality_scanner_state['matched']} tracks added to wishlist", "Now")
try:
if deps.automation_engine:
deps.automation_engine.emit('quality_scan_completed', {
'quality_met': str(deps.quality_scanner_state.get('quality_met', 0)),
'low_quality': str(deps.quality_scanner_state.get('low_quality', 0)),
'total_scanned': str(deps.quality_scanner_state.get('processed', 0)),
})
except Exception as e:
logger.debug("emit quality_scan_completed failed: %s", e)
except Exception as e:
logger.error(f"[Quality Scanner] Critical error: {e}")
import traceback
traceback.print_exc()
with deps.quality_scanner_lock:
deps.quality_scanner_state["status"] = "error"
deps.quality_scanner_state["error_message"] = str(e)
deps.quality_scanner_state["phase"] = f"Error: {str(e)}"