From 6dca19ca1eefefe840001ecc560a60e55e16232d Mon Sep 17 00:00:00 2001 From: Antti Kettunen Date: Wed, 15 Apr 2026 19:35:18 +0300 Subject: [PATCH] Use metadata source priority in unknown artist fixer job Unknown artist resolution now uses the shared metadata source priority and only filters to the sources that can actually participate in this job. Deezer and iTunes remain direct lookup sources, while Hydrabase can now join the title-search path when it is the configured priority source. --- core/repair_jobs/unknown_artist_fixer.py | 236 ++++++++++++++-------- tests/test_unknown_artist_fixer.py | 238 +++++++++++++++++++++++ 2 files changed, 389 insertions(+), 85 deletions(-) create mode 100644 tests/test_unknown_artist_fixer.py diff --git a/core/repair_jobs/unknown_artist_fixer.py b/core/repair_jobs/unknown_artist_fixer.py index c81663c1..01086989 100644 --- a/core/repair_jobs/unknown_artist_fixer.py +++ b/core/repair_jobs/unknown_artist_fixer.py @@ -8,9 +8,8 @@ import os import re import shutil import sys -import time -from core.metadata_service import get_client_for_source, get_primary_client, get_primary_source +from core.metadata_service import get_client_for_source, get_primary_source, get_source_priority from core.repair_jobs import register_job from core.repair_jobs.base import JobContext, JobResult, RepairJob from utils.logging_config import get_logger @@ -18,6 +17,8 @@ from utils.logging_config import get_logger logger = get_logger("repair_job.unknown_artist_fixer") _UNKNOWN_NAMES = {'unknown artist', 'unknown', ''} +_TRACK_ID_SOURCES = {'spotify', 'deezer', 'itunes'} +_TITLE_SEARCH_SOURCES = {'spotify', 'deezer', 'itunes', 'hydrabase'} # Sidecar extensions to move alongside audio files _SIDECAR_EXTS = {'.lrc', '.jpg', '.jpeg', '.png', '.nfo', '.txt', '.cue'} @@ -249,6 +250,7 @@ class UnknownArtistFixerJob(RepairJob): Returns dict with artist, album, track_number, year, etc. or None.""" title = track['title'] or '' + primary_source = get_primary_source() # Priority 1: Read embedded file tags try: @@ -269,96 +271,160 @@ class UnknownArtistFixerJob(RepairJob): except Exception as e: logger.debug(f"Failed to read tags from {resolved_path}: {e}") - # Priority 2: Look up by source track ID using the appropriate client. - # Try the primary source's ID first, then fall back to any available ID - # with its matching client so we never pass a Deezer/iTunes ID to Spotify - # (or vice-versa). - _primary = get_primary_source() - _id_candidates = [] - for _src in [_primary] + [s for s in ('spotify', 'deezer', 'itunes') if s != _primary]: - _tid = track.get(f'{_src}_track_id') - if _tid: - _id_candidates.append((_src, _tid)) - source_id = None - _lookup_client = None - for _src, _tid in _id_candidates: - _c = get_client_for_source(_src) - if _c: - source_id = _tid - _lookup_client = _c - break - if source_id and _lookup_client: + # Priority 2: Look up by source track ID + for source, source_id in self._iter_source_track_ids(track, primary_source): + client = get_client_for_source(source) + if not client or not hasattr(client, 'get_track_details'): + continue try: - details = _lookup_client.get_track_details(str(source_id)) - if details and details.get('primary_artist'): - artist = details['primary_artist'] - if artist.lower() not in _UNKNOWN_NAMES: - album = details.get('album', {}) - album_name = album.get('name', '') if isinstance(album, dict) else str(album) - return { - 'artist': artist, - 'album': album_name, - 'title': details.get('name', title), - 'track_number': details.get('track_number'), - 'disc_number': details.get('disc_number', 1), - 'year': (album.get('release_date', '') or '')[:4] if isinstance(album, dict) else '', - 'image_url': album.get('images', [{}])[0].get('url', '') if isinstance(album, dict) and album.get('images') else '', - 'source': 'track_id_lookup', - 'confidence': 0.95, - } + details = client.get_track_details(str(source_id)) + corrected = self._build_corrected_metadata( + details, + fallback_title=title, + source=f"{source}_track_id_lookup", + confidence=0.95, + ) + if corrected: + return corrected except Exception as e: - logger.debug(f"Track ID lookup failed for {source_id}: {e}") + logger.debug(f"Track ID lookup failed for {source} {source_id}: {e}") + + # Priority 3: Search by title + if title: + for source in self._iter_source_priority(primary_source, _TITLE_SEARCH_SOURCES): + client = get_client_for_source(source) + if not client or not hasattr(client, 'search_tracks'): + continue + try: + results = client.search_tracks(title, limit=5) + if not results: + continue - # Priority 3: Search by title using the configured primary metadata source - _search_client = get_primary_client() - if title and _search_client: - try: - results = _search_client.search_tracks(title, limit=5) - if results: - # Score candidates - from difflib import SequenceMatcher - best = None - best_score = 0 - for r in results: - name_sim = SequenceMatcher(None, title.lower(), r.name.lower()).ratio() - # Boost if album matches - album_name = r.album if hasattr(r, 'album') else '' - if album_name and track.get('album_title'): - album_sim = SequenceMatcher(None, track['album_title'].lower(), album_name.lower()).ratio() - name_sim = (name_sim * 0.7) + (album_sim * 0.3) - if name_sim > best_score: - best_score = name_sim - best = r - - if best and best_score >= 0.7: - artist = best.artists[0] if best.artists else '' - if artist and artist.lower() not in _UNKNOWN_NAMES: - # Get full details for track_number + best, best_score = self._pick_best_track_candidate(title, track.get('album_title'), results) + if not best or best_score < 0.7: + continue + + full_details = None + if hasattr(client, 'get_track_details') and getattr(best, 'id', None): + try: + full_details = client.get_track_details(str(best.id)) + except Exception: full_details = None - try: - full_details = _search_client.get_track_details(best.id) - except Exception: - pass - album_data = full_details.get('album', {}) if full_details else {} - return { - 'artist': artist, - 'album': best.album if hasattr(best, 'album') else '', - 'title': best.name, - 'track_number': full_details.get('track_number') if full_details else None, - 'disc_number': full_details.get('disc_number', 1) if full_details else 1, - 'year': (album_data.get('release_date', '') or '')[:4] if isinstance(album_data, dict) else '', - 'image_url': getattr(best, 'image_url', '') or '', - 'source': 'title_search', - 'confidence': round(best_score, 3), - } - except Exception as e: - logger.debug(f"Title search failed for '{title}': {e}") - # Rate limit courtesy - if context.sleep_or_stop(0.2): - return None + + corrected = self._build_corrected_metadata( + full_details or best, + fallback_title=title, + source=f"{source}_title_search", + confidence=round(best_score, 3), + ) + if corrected: + return corrected + except Exception as e: + logger.debug(f"Title search failed for '{title}' via {source}: {e}") + # Rate limit courtesy + if context.sleep_or_stop(0.2): + return None return None + @staticmethod + def _get_track_value(payload, key, default=None): + if isinstance(payload, dict): + return payload.get(key, default) + return getattr(payload, key, default) + + def _iter_source_track_ids(self, track: dict, primary_source: str): + source_fields = { + 'spotify': 'spotify_track_id', + 'deezer': 'deezer_track_id', + 'itunes': 'itunes_track_id', + } + ordered_sources = [source for source in self._iter_source_priority(primary_source, _TRACK_ID_SOURCES) if source in source_fields] + for source in ordered_sources: + source_id = track.get(source_fields[source]) + if source_id: + yield source, source_id + + @staticmethod + def _iter_source_priority(primary_source: str, allowed_sources: set[str]): + return [source for source in get_source_priority(primary_source) if source in allowed_sources] + + def _pick_best_track_candidate(self, title: str, album_title: str, results): + from difflib import SequenceMatcher + + best = None + best_score = 0.0 + title_lower = title.lower() + album_lower = album_title.lower() if album_title else '' + + for candidate in results: + candidate_name = self._get_track_value(candidate, 'name', '') or '' + if not candidate_name: + continue + name_sim = SequenceMatcher(None, title_lower, candidate_name.lower()).ratio() + + candidate_album = self._get_track_value(candidate, 'album', '') or '' + if album_lower and candidate_album: + if isinstance(candidate_album, dict): + candidate_album = candidate_album.get('name') or candidate_album.get('title') or '' + album_sim = SequenceMatcher(None, album_lower, str(candidate_album).lower()).ratio() + name_sim = (name_sim * 0.7) + (album_sim * 0.3) + + if name_sim > best_score: + best_score = name_sim + best = candidate + + return best, best_score + + def _build_corrected_metadata(self, payload, fallback_title: str, source: str, confidence: float): + if not payload: + return None + + artist = self._get_track_value(payload, 'primary_artist', '') or '' + artists = self._get_track_value(payload, 'artists', []) or [] + if not artist and artists: + if isinstance(artists, list): + first_artist = artists[0] + if isinstance(first_artist, dict): + artist = first_artist.get('name', '') + else: + artist = str(first_artist) + + artist = (artist or '').strip() + if not artist or artist.lower() in _UNKNOWN_NAMES: + return None + + album = self._get_track_value(payload, 'album', {}) or {} + if isinstance(album, dict): + album_name = album.get('name', '') or album.get('title', '') or '' + year = (album.get('release_date', '') or '')[:4] + image_url = '' + images = album.get('images') or [] + if images: + first_image = images[0] + if isinstance(first_image, dict): + image_url = first_image.get('url', '') or '' + else: + album_name = str(album) + year = '' + image_url = '' + + image_url = self._get_track_value(payload, 'image_url', image_url) or image_url + + title = self._get_track_value(payload, 'name', fallback_title) or fallback_title + + return { + 'artist': artist, + 'album': album_name, + 'title': title, + 'track_number': self._get_track_value(payload, 'track_number'), + 'disc_number': self._get_track_value(payload, 'disc_number', 1) or 1, + 'year': year, + 'image_url': image_url, + 'source': source, + 'confidence': confidence, + } + def _apply_fix(self, context, track, corrected, resolved_path, expected_rel, transfer, fix_tags, reorganize_files): """Apply the fix: re-tag file, move to correct path, update DB.""" diff --git a/tests/test_unknown_artist_fixer.py b/tests/test_unknown_artist_fixer.py new file mode 100644 index 00000000..2c6d7c90 --- /dev/null +++ b/tests/test_unknown_artist_fixer.py @@ -0,0 +1,238 @@ +import sys +import types +from types import SimpleNamespace + +if "spotipy" not in sys.modules: + spotipy = types.ModuleType("spotipy") + + class _DummySpotify: + def __init__(self, *args, **kwargs): + pass + + oauth2 = types.ModuleType("spotipy.oauth2") + + class _DummyOAuth: + def __init__(self, *args, **kwargs): + pass + + spotipy.Spotify = _DummySpotify + oauth2.SpotifyOAuth = _DummyOAuth + oauth2.SpotifyClientCredentials = _DummyOAuth + spotipy.oauth2 = oauth2 + sys.modules["spotipy"] = spotipy + sys.modules["spotipy.oauth2"] = oauth2 + +if "config.settings" not in sys.modules: + config_pkg = types.ModuleType("config") + settings_mod = types.ModuleType("config.settings") + + class _DummyConfigManager: + def get(self, key, default=None): + return default + + settings_mod.config_manager = _DummyConfigManager() + config_pkg.settings = settings_mod + sys.modules["config"] = config_pkg + sys.modules["config.settings"] = settings_mod + +from core.repair_jobs.unknown_artist_fixer import UnknownArtistFixerJob +import core.repair_jobs.unknown_artist_fixer as unknown_artist_fixer_module + + +class _FakeClient: + def __init__(self, track_details=None, search_results=None): + self.track_details = track_details or {} + self.search_results = search_results or {} + self.get_calls = [] + self.search_calls = [] + + def get_track_details(self, track_id): + self.get_calls.append(track_id) + return self.track_details.get(track_id) + + def search_tracks(self, query, limit=5): + self.search_calls.append((query, limit)) + return self.search_results.get(query, []) + + +def _install_tag_reader(monkeypatch, tags=None): + fake_module = types.ModuleType("core.tag_writer") + fake_module.read_file_tags = lambda path: tags or {} + monkeypatch.setitem(sys.modules, "core.tag_writer", fake_module) + + +def test_unknown_artist_fixer_uses_primary_source_track_id_first(monkeypatch): + job = UnknownArtistFixerJob() + _install_tag_reader(monkeypatch) + + deezer_client = _FakeClient( + track_details={ + "dz-1": { + "primary_artist": "Deezer Artist", + "album": { + "name": "Deezer Album", + "release_date": "2024-02-01", + "images": [{"url": "https://img/deezer"}], + }, + "name": "Deezer Song", + "track_number": 7, + "disc_number": 1, + } + } + ) + spotify_client = _FakeClient( + track_details={ + "sp-1": { + "primary_artist": "Spotify Artist", + "album": { + "name": "Spotify Album", + "release_date": "2023-01-01", + "images": [{"url": "https://img/spotify"}], + }, + "name": "Spotify Song", + "track_number": 1, + "disc_number": 1, + } + } + ) + + monkeypatch.setattr(unknown_artist_fixer_module, "get_primary_source", lambda: "deezer") + monkeypatch.setattr( + unknown_artist_fixer_module, + "get_client_for_source", + lambda source: {"deezer": deezer_client, "spotify": spotify_client}.get(source), + ) + + track = { + "title": "Unknown Title", + "album_title": "Unknown Album", + "spotify_track_id": "sp-1", + "deezer_track_id": "dz-1", + "itunes_track_id": "", + } + + result = job._resolve_metadata(SimpleNamespace(), track, "/tmp/track.flac") + + assert result["artist"] == "Deezer Artist" + assert result["album"] == "Deezer Album" + assert result["source"] == "deezer_track_id_lookup" + assert deezer_client.get_calls == ["dz-1"] + assert spotify_client.get_calls == [] + + +def test_unknown_artist_fixer_searches_primary_source_first(monkeypatch): + job = UnknownArtistFixerJob() + _install_tag_reader(monkeypatch) + + candidate = SimpleNamespace( + id="dz-song-1", + name="Matching Title", + album="Matching Album", + artists=["Deezer Artist"], + image_url="https://img/deezer-search", + ) + + deezer_client = _FakeClient( + track_details={ + "dz-song-1": { + "primary_artist": "Deezer Artist", + "album": { + "name": "Matching Album", + "release_date": "2024-03-02", + "images": [{"url": "https://img/deezer-full"}], + }, + "name": "Matching Title", + "track_number": 4, + "disc_number": 1, + } + }, + search_results={"Matching Title": [candidate]}, + ) + spotify_client = _FakeClient( + search_results={ + "Matching Title": [ + SimpleNamespace( + id="sp-song-1", + name="Matching Title", + album="Matching Album", + artists=["Spotify Artist"], + image_url="https://img/spotify-search", + ) + ] + } + ) + + monkeypatch.setattr(unknown_artist_fixer_module, "get_primary_source", lambda: "deezer") + monkeypatch.setattr( + unknown_artist_fixer_module, + "get_client_for_source", + lambda source: {"deezer": deezer_client, "spotify": spotify_client}.get(source), + ) + + track = { + "title": "Matching Title", + "album_title": "Matching Album", + "spotify_track_id": "", + "deezer_track_id": "", + "itunes_track_id": "", + } + + result = job._resolve_metadata(SimpleNamespace(sleep_or_stop=lambda seconds: False), track, "/tmp/track.flac") + + assert result["artist"] == "Deezer Artist" + assert result["album"] == "Matching Album" + assert result["source"] == "deezer_title_search" + assert deezer_client.search_calls == [("Matching Title", 5)] + assert spotify_client.search_calls == [] + + +def test_unknown_artist_fixer_supports_hydrabase_title_search(monkeypatch): + job = UnknownArtistFixerJob() + _install_tag_reader(monkeypatch) + + hydrabase_candidate = SimpleNamespace( + id="hy-song-1", + name="Hydra Match", + album="Hydra Album", + artists=["Hydra Artist"], + image_url="https://img/hydra-search", + ) + hydrabase_client = _FakeClient( + track_details={ + "hy-song-1": { + "primary_artist": "Hydra Artist", + "album": { + "name": "Hydra Album", + "release_date": "2024-04-03", + "images": [{"url": "https://img/hydra-full"}], + }, + "name": "Hydra Match", + "track_number": 2, + "disc_number": 1, + } + }, + search_results={"Hydra Match": [hydrabase_candidate]}, + ) + spotify_client = _FakeClient() + + monkeypatch.setattr(unknown_artist_fixer_module, "get_primary_source", lambda: "hydrabase") + monkeypatch.setattr( + unknown_artist_fixer_module, + "get_client_for_source", + lambda source: {"hydrabase": hydrabase_client, "spotify": spotify_client}.get(source), + ) + + track = { + "title": "Hydra Match", + "album_title": "Hydra Album", + "spotify_track_id": "", + "deezer_track_id": "", + "itunes_track_id": "", + } + + result = job._resolve_metadata(SimpleNamespace(sleep_or_stop=lambda seconds: False), track, "/tmp/track.flac") + + assert result["artist"] == "Hydra Artist" + assert result["source"] == "hydrabase_title_search" + assert hydrabase_client.search_calls == [("Hydra Match", 5)] + assert spotify_client.search_calls == []