diff --git a/core/repair_jobs/unknown_artist_fixer.py b/core/repair_jobs/unknown_artist_fixer.py index c81663c1..01086989 100644 --- a/core/repair_jobs/unknown_artist_fixer.py +++ b/core/repair_jobs/unknown_artist_fixer.py @@ -8,9 +8,8 @@ import os import re import shutil import sys -import time -from core.metadata_service import get_client_for_source, get_primary_client, get_primary_source +from core.metadata_service import get_client_for_source, get_primary_source, get_source_priority from core.repair_jobs import register_job from core.repair_jobs.base import JobContext, JobResult, RepairJob from utils.logging_config import get_logger @@ -18,6 +17,8 @@ from utils.logging_config import get_logger logger = get_logger("repair_job.unknown_artist_fixer") _UNKNOWN_NAMES = {'unknown artist', 'unknown', ''} +_TRACK_ID_SOURCES = {'spotify', 'deezer', 'itunes'} +_TITLE_SEARCH_SOURCES = {'spotify', 'deezer', 'itunes', 'hydrabase'} # Sidecar extensions to move alongside audio files _SIDECAR_EXTS = {'.lrc', '.jpg', '.jpeg', '.png', '.nfo', '.txt', '.cue'} @@ -249,6 +250,7 @@ class UnknownArtistFixerJob(RepairJob): Returns dict with artist, album, track_number, year, etc. or None.""" title = track['title'] or '' + primary_source = get_primary_source() # Priority 1: Read embedded file tags try: @@ -269,96 +271,160 @@ class UnknownArtistFixerJob(RepairJob): except Exception as e: logger.debug(f"Failed to read tags from {resolved_path}: {e}") - # Priority 2: Look up by source track ID using the appropriate client. - # Try the primary source's ID first, then fall back to any available ID - # with its matching client so we never pass a Deezer/iTunes ID to Spotify - # (or vice-versa). - _primary = get_primary_source() - _id_candidates = [] - for _src in [_primary] + [s for s in ('spotify', 'deezer', 'itunes') if s != _primary]: - _tid = track.get(f'{_src}_track_id') - if _tid: - _id_candidates.append((_src, _tid)) - source_id = None - _lookup_client = None - for _src, _tid in _id_candidates: - _c = get_client_for_source(_src) - if _c: - source_id = _tid - _lookup_client = _c - break - if source_id and _lookup_client: + # Priority 2: Look up by source track ID + for source, source_id in self._iter_source_track_ids(track, primary_source): + client = get_client_for_source(source) + if not client or not hasattr(client, 'get_track_details'): + continue try: - details = _lookup_client.get_track_details(str(source_id)) - if details and details.get('primary_artist'): - artist = details['primary_artist'] - if artist.lower() not in _UNKNOWN_NAMES: - album = details.get('album', {}) - album_name = album.get('name', '') if isinstance(album, dict) else str(album) - return { - 'artist': artist, - 'album': album_name, - 'title': details.get('name', title), - 'track_number': details.get('track_number'), - 'disc_number': details.get('disc_number', 1), - 'year': (album.get('release_date', '') or '')[:4] if isinstance(album, dict) else '', - 'image_url': album.get('images', [{}])[0].get('url', '') if isinstance(album, dict) and album.get('images') else '', - 'source': 'track_id_lookup', - 'confidence': 0.95, - } + details = client.get_track_details(str(source_id)) + corrected = self._build_corrected_metadata( + details, + fallback_title=title, + source=f"{source}_track_id_lookup", + confidence=0.95, + ) + if corrected: + return corrected except Exception as e: - logger.debug(f"Track ID lookup failed for {source_id}: {e}") + logger.debug(f"Track ID lookup failed for {source} {source_id}: {e}") + + # Priority 3: Search by title + if title: + for source in self._iter_source_priority(primary_source, _TITLE_SEARCH_SOURCES): + client = get_client_for_source(source) + if not client or not hasattr(client, 'search_tracks'): + continue + try: + results = client.search_tracks(title, limit=5) + if not results: + continue - # Priority 3: Search by title using the configured primary metadata source - _search_client = get_primary_client() - if title and _search_client: - try: - results = _search_client.search_tracks(title, limit=5) - if results: - # Score candidates - from difflib import SequenceMatcher - best = None - best_score = 0 - for r in results: - name_sim = SequenceMatcher(None, title.lower(), r.name.lower()).ratio() - # Boost if album matches - album_name = r.album if hasattr(r, 'album') else '' - if album_name and track.get('album_title'): - album_sim = SequenceMatcher(None, track['album_title'].lower(), album_name.lower()).ratio() - name_sim = (name_sim * 0.7) + (album_sim * 0.3) - if name_sim > best_score: - best_score = name_sim - best = r - - if best and best_score >= 0.7: - artist = best.artists[0] if best.artists else '' - if artist and artist.lower() not in _UNKNOWN_NAMES: - # Get full details for track_number + best, best_score = self._pick_best_track_candidate(title, track.get('album_title'), results) + if not best or best_score < 0.7: + continue + + full_details = None + if hasattr(client, 'get_track_details') and getattr(best, 'id', None): + try: + full_details = client.get_track_details(str(best.id)) + except Exception: full_details = None - try: - full_details = _search_client.get_track_details(best.id) - except Exception: - pass - album_data = full_details.get('album', {}) if full_details else {} - return { - 'artist': artist, - 'album': best.album if hasattr(best, 'album') else '', - 'title': best.name, - 'track_number': full_details.get('track_number') if full_details else None, - 'disc_number': full_details.get('disc_number', 1) if full_details else 1, - 'year': (album_data.get('release_date', '') or '')[:4] if isinstance(album_data, dict) else '', - 'image_url': getattr(best, 'image_url', '') or '', - 'source': 'title_search', - 'confidence': round(best_score, 3), - } - except Exception as e: - logger.debug(f"Title search failed for '{title}': {e}") - # Rate limit courtesy - if context.sleep_or_stop(0.2): - return None + + corrected = self._build_corrected_metadata( + full_details or best, + fallback_title=title, + source=f"{source}_title_search", + confidence=round(best_score, 3), + ) + if corrected: + return corrected + except Exception as e: + logger.debug(f"Title search failed for '{title}' via {source}: {e}") + # Rate limit courtesy + if context.sleep_or_stop(0.2): + return None return None + @staticmethod + def _get_track_value(payload, key, default=None): + if isinstance(payload, dict): + return payload.get(key, default) + return getattr(payload, key, default) + + def _iter_source_track_ids(self, track: dict, primary_source: str): + source_fields = { + 'spotify': 'spotify_track_id', + 'deezer': 'deezer_track_id', + 'itunes': 'itunes_track_id', + } + ordered_sources = [source for source in self._iter_source_priority(primary_source, _TRACK_ID_SOURCES) if source in source_fields] + for source in ordered_sources: + source_id = track.get(source_fields[source]) + if source_id: + yield source, source_id + + @staticmethod + def _iter_source_priority(primary_source: str, allowed_sources: set[str]): + return [source for source in get_source_priority(primary_source) if source in allowed_sources] + + def _pick_best_track_candidate(self, title: str, album_title: str, results): + from difflib import SequenceMatcher + + best = None + best_score = 0.0 + title_lower = title.lower() + album_lower = album_title.lower() if album_title else '' + + for candidate in results: + candidate_name = self._get_track_value(candidate, 'name', '') or '' + if not candidate_name: + continue + name_sim = SequenceMatcher(None, title_lower, candidate_name.lower()).ratio() + + candidate_album = self._get_track_value(candidate, 'album', '') or '' + if album_lower and candidate_album: + if isinstance(candidate_album, dict): + candidate_album = candidate_album.get('name') or candidate_album.get('title') or '' + album_sim = SequenceMatcher(None, album_lower, str(candidate_album).lower()).ratio() + name_sim = (name_sim * 0.7) + (album_sim * 0.3) + + if name_sim > best_score: + best_score = name_sim + best = candidate + + return best, best_score + + def _build_corrected_metadata(self, payload, fallback_title: str, source: str, confidence: float): + if not payload: + return None + + artist = self._get_track_value(payload, 'primary_artist', '') or '' + artists = self._get_track_value(payload, 'artists', []) or [] + if not artist and artists: + if isinstance(artists, list): + first_artist = artists[0] + if isinstance(first_artist, dict): + artist = first_artist.get('name', '') + else: + artist = str(first_artist) + + artist = (artist or '').strip() + if not artist or artist.lower() in _UNKNOWN_NAMES: + return None + + album = self._get_track_value(payload, 'album', {}) or {} + if isinstance(album, dict): + album_name = album.get('name', '') or album.get('title', '') or '' + year = (album.get('release_date', '') or '')[:4] + image_url = '' + images = album.get('images') or [] + if images: + first_image = images[0] + if isinstance(first_image, dict): + image_url = first_image.get('url', '') or '' + else: + album_name = str(album) + year = '' + image_url = '' + + image_url = self._get_track_value(payload, 'image_url', image_url) or image_url + + title = self._get_track_value(payload, 'name', fallback_title) or fallback_title + + return { + 'artist': artist, + 'album': album_name, + 'title': title, + 'track_number': self._get_track_value(payload, 'track_number'), + 'disc_number': self._get_track_value(payload, 'disc_number', 1) or 1, + 'year': year, + 'image_url': image_url, + 'source': source, + 'confidence': confidence, + } + def _apply_fix(self, context, track, corrected, resolved_path, expected_rel, transfer, fix_tags, reorganize_files): """Apply the fix: re-tag file, move to correct path, update DB.""" diff --git a/tests/test_unknown_artist_fixer.py b/tests/test_unknown_artist_fixer.py new file mode 100644 index 00000000..2c6d7c90 --- /dev/null +++ b/tests/test_unknown_artist_fixer.py @@ -0,0 +1,238 @@ +import sys +import types +from types import SimpleNamespace + +if "spotipy" not in sys.modules: + spotipy = types.ModuleType("spotipy") + + class _DummySpotify: + def __init__(self, *args, **kwargs): + pass + + oauth2 = types.ModuleType("spotipy.oauth2") + + class _DummyOAuth: + def __init__(self, *args, **kwargs): + pass + + spotipy.Spotify = _DummySpotify + oauth2.SpotifyOAuth = _DummyOAuth + oauth2.SpotifyClientCredentials = _DummyOAuth + spotipy.oauth2 = oauth2 + sys.modules["spotipy"] = spotipy + sys.modules["spotipy.oauth2"] = oauth2 + +if "config.settings" not in sys.modules: + config_pkg = types.ModuleType("config") + settings_mod = types.ModuleType("config.settings") + + class _DummyConfigManager: + def get(self, key, default=None): + return default + + settings_mod.config_manager = _DummyConfigManager() + config_pkg.settings = settings_mod + sys.modules["config"] = config_pkg + sys.modules["config.settings"] = settings_mod + +from core.repair_jobs.unknown_artist_fixer import UnknownArtistFixerJob +import core.repair_jobs.unknown_artist_fixer as unknown_artist_fixer_module + + +class _FakeClient: + def __init__(self, track_details=None, search_results=None): + self.track_details = track_details or {} + self.search_results = search_results or {} + self.get_calls = [] + self.search_calls = [] + + def get_track_details(self, track_id): + self.get_calls.append(track_id) + return self.track_details.get(track_id) + + def search_tracks(self, query, limit=5): + self.search_calls.append((query, limit)) + return self.search_results.get(query, []) + + +def _install_tag_reader(monkeypatch, tags=None): + fake_module = types.ModuleType("core.tag_writer") + fake_module.read_file_tags = lambda path: tags or {} + monkeypatch.setitem(sys.modules, "core.tag_writer", fake_module) + + +def test_unknown_artist_fixer_uses_primary_source_track_id_first(monkeypatch): + job = UnknownArtistFixerJob() + _install_tag_reader(monkeypatch) + + deezer_client = _FakeClient( + track_details={ + "dz-1": { + "primary_artist": "Deezer Artist", + "album": { + "name": "Deezer Album", + "release_date": "2024-02-01", + "images": [{"url": "https://img/deezer"}], + }, + "name": "Deezer Song", + "track_number": 7, + "disc_number": 1, + } + } + ) + spotify_client = _FakeClient( + track_details={ + "sp-1": { + "primary_artist": "Spotify Artist", + "album": { + "name": "Spotify Album", + "release_date": "2023-01-01", + "images": [{"url": "https://img/spotify"}], + }, + "name": "Spotify Song", + "track_number": 1, + "disc_number": 1, + } + } + ) + + monkeypatch.setattr(unknown_artist_fixer_module, "get_primary_source", lambda: "deezer") + monkeypatch.setattr( + unknown_artist_fixer_module, + "get_client_for_source", + lambda source: {"deezer": deezer_client, "spotify": spotify_client}.get(source), + ) + + track = { + "title": "Unknown Title", + "album_title": "Unknown Album", + "spotify_track_id": "sp-1", + "deezer_track_id": "dz-1", + "itunes_track_id": "", + } + + result = job._resolve_metadata(SimpleNamespace(), track, "/tmp/track.flac") + + assert result["artist"] == "Deezer Artist" + assert result["album"] == "Deezer Album" + assert result["source"] == "deezer_track_id_lookup" + assert deezer_client.get_calls == ["dz-1"] + assert spotify_client.get_calls == [] + + +def test_unknown_artist_fixer_searches_primary_source_first(monkeypatch): + job = UnknownArtistFixerJob() + _install_tag_reader(monkeypatch) + + candidate = SimpleNamespace( + id="dz-song-1", + name="Matching Title", + album="Matching Album", + artists=["Deezer Artist"], + image_url="https://img/deezer-search", + ) + + deezer_client = _FakeClient( + track_details={ + "dz-song-1": { + "primary_artist": "Deezer Artist", + "album": { + "name": "Matching Album", + "release_date": "2024-03-02", + "images": [{"url": "https://img/deezer-full"}], + }, + "name": "Matching Title", + "track_number": 4, + "disc_number": 1, + } + }, + search_results={"Matching Title": [candidate]}, + ) + spotify_client = _FakeClient( + search_results={ + "Matching Title": [ + SimpleNamespace( + id="sp-song-1", + name="Matching Title", + album="Matching Album", + artists=["Spotify Artist"], + image_url="https://img/spotify-search", + ) + ] + } + ) + + monkeypatch.setattr(unknown_artist_fixer_module, "get_primary_source", lambda: "deezer") + monkeypatch.setattr( + unknown_artist_fixer_module, + "get_client_for_source", + lambda source: {"deezer": deezer_client, "spotify": spotify_client}.get(source), + ) + + track = { + "title": "Matching Title", + "album_title": "Matching Album", + "spotify_track_id": "", + "deezer_track_id": "", + "itunes_track_id": "", + } + + result = job._resolve_metadata(SimpleNamespace(sleep_or_stop=lambda seconds: False), track, "/tmp/track.flac") + + assert result["artist"] == "Deezer Artist" + assert result["album"] == "Matching Album" + assert result["source"] == "deezer_title_search" + assert deezer_client.search_calls == [("Matching Title", 5)] + assert spotify_client.search_calls == [] + + +def test_unknown_artist_fixer_supports_hydrabase_title_search(monkeypatch): + job = UnknownArtistFixerJob() + _install_tag_reader(monkeypatch) + + hydrabase_candidate = SimpleNamespace( + id="hy-song-1", + name="Hydra Match", + album="Hydra Album", + artists=["Hydra Artist"], + image_url="https://img/hydra-search", + ) + hydrabase_client = _FakeClient( + track_details={ + "hy-song-1": { + "primary_artist": "Hydra Artist", + "album": { + "name": "Hydra Album", + "release_date": "2024-04-03", + "images": [{"url": "https://img/hydra-full"}], + }, + "name": "Hydra Match", + "track_number": 2, + "disc_number": 1, + } + }, + search_results={"Hydra Match": [hydrabase_candidate]}, + ) + spotify_client = _FakeClient() + + monkeypatch.setattr(unknown_artist_fixer_module, "get_primary_source", lambda: "hydrabase") + monkeypatch.setattr( + unknown_artist_fixer_module, + "get_client_for_source", + lambda source: {"hydrabase": hydrabase_client, "spotify": spotify_client}.get(source), + ) + + track = { + "title": "Hydra Match", + "album_title": "Hydra Album", + "spotify_track_id": "", + "deezer_track_id": "", + "itunes_track_id": "", + } + + result = job._resolve_metadata(SimpleNamespace(sleep_or_stop=lambda seconds: False), track, "/tmp/track.flac") + + assert result["artist"] == "Hydra Artist" + assert result["source"] == "hydrabase_title_search" + assert hydrabase_client.search_calls == [("Hydra Match", 5)] + assert spotify_client.search_calls == []