Use metadata source priority in unknown artist fixer job

Unknown artist resolution now uses the shared metadata source priority and only filters to the sources that can actually participate in this job. Deezer and iTunes remain direct lookup sources, while Hydrabase can now join the title-search path when it is the configured priority source.
pull/301/head
Antti Kettunen 4 weeks ago
parent fe399636b2
commit 6dca19ca1e

@ -8,9 +8,8 @@ import os
import re
import shutil
import sys
import time
from core.metadata_service import get_client_for_source, get_primary_client, get_primary_source
from core.metadata_service import get_client_for_source, get_primary_source, get_source_priority
from core.repair_jobs import register_job
from core.repair_jobs.base import JobContext, JobResult, RepairJob
from utils.logging_config import get_logger
@ -18,6 +17,8 @@ from utils.logging_config import get_logger
logger = get_logger("repair_job.unknown_artist_fixer")
_UNKNOWN_NAMES = {'unknown artist', 'unknown', ''}
_TRACK_ID_SOURCES = {'spotify', 'deezer', 'itunes'}
_TITLE_SEARCH_SOURCES = {'spotify', 'deezer', 'itunes', 'hydrabase'}
# Sidecar extensions to move alongside audio files
_SIDECAR_EXTS = {'.lrc', '.jpg', '.jpeg', '.png', '.nfo', '.txt', '.cue'}
@ -249,6 +250,7 @@ class UnknownArtistFixerJob(RepairJob):
Returns dict with artist, album, track_number, year, etc. or None."""
title = track['title'] or ''
primary_source = get_primary_source()
# Priority 1: Read embedded file tags
try:
@ -269,96 +271,160 @@ class UnknownArtistFixerJob(RepairJob):
except Exception as e:
logger.debug(f"Failed to read tags from {resolved_path}: {e}")
# Priority 2: Look up by source track ID using the appropriate client.
# Try the primary source's ID first, then fall back to any available ID
# with its matching client so we never pass a Deezer/iTunes ID to Spotify
# (or vice-versa).
_primary = get_primary_source()
_id_candidates = []
for _src in [_primary] + [s for s in ('spotify', 'deezer', 'itunes') if s != _primary]:
_tid = track.get(f'{_src}_track_id')
if _tid:
_id_candidates.append((_src, _tid))
source_id = None
_lookup_client = None
for _src, _tid in _id_candidates:
_c = get_client_for_source(_src)
if _c:
source_id = _tid
_lookup_client = _c
break
if source_id and _lookup_client:
# Priority 2: Look up by source track ID
for source, source_id in self._iter_source_track_ids(track, primary_source):
client = get_client_for_source(source)
if not client or not hasattr(client, 'get_track_details'):
continue
try:
details = _lookup_client.get_track_details(str(source_id))
if details and details.get('primary_artist'):
artist = details['primary_artist']
if artist.lower() not in _UNKNOWN_NAMES:
album = details.get('album', {})
album_name = album.get('name', '') if isinstance(album, dict) else str(album)
return {
'artist': artist,
'album': album_name,
'title': details.get('name', title),
'track_number': details.get('track_number'),
'disc_number': details.get('disc_number', 1),
'year': (album.get('release_date', '') or '')[:4] if isinstance(album, dict) else '',
'image_url': album.get('images', [{}])[0].get('url', '') if isinstance(album, dict) and album.get('images') else '',
'source': 'track_id_lookup',
'confidence': 0.95,
}
details = client.get_track_details(str(source_id))
corrected = self._build_corrected_metadata(
details,
fallback_title=title,
source=f"{source}_track_id_lookup",
confidence=0.95,
)
if corrected:
return corrected
except Exception as e:
logger.debug(f"Track ID lookup failed for {source_id}: {e}")
logger.debug(f"Track ID lookup failed for {source} {source_id}: {e}")
# Priority 3: Search by title
if title:
for source in self._iter_source_priority(primary_source, _TITLE_SEARCH_SOURCES):
client = get_client_for_source(source)
if not client or not hasattr(client, 'search_tracks'):
continue
try:
results = client.search_tracks(title, limit=5)
if not results:
continue
# Priority 3: Search by title using the configured primary metadata source
_search_client = get_primary_client()
if title and _search_client:
try:
results = _search_client.search_tracks(title, limit=5)
if results:
# Score candidates
from difflib import SequenceMatcher
best = None
best_score = 0
for r in results:
name_sim = SequenceMatcher(None, title.lower(), r.name.lower()).ratio()
# Boost if album matches
album_name = r.album if hasattr(r, 'album') else ''
if album_name and track.get('album_title'):
album_sim = SequenceMatcher(None, track['album_title'].lower(), album_name.lower()).ratio()
name_sim = (name_sim * 0.7) + (album_sim * 0.3)
if name_sim > best_score:
best_score = name_sim
best = r
if best and best_score >= 0.7:
artist = best.artists[0] if best.artists else ''
if artist and artist.lower() not in _UNKNOWN_NAMES:
# Get full details for track_number
best, best_score = self._pick_best_track_candidate(title, track.get('album_title'), results)
if not best or best_score < 0.7:
continue
full_details = None
if hasattr(client, 'get_track_details') and getattr(best, 'id', None):
try:
full_details = client.get_track_details(str(best.id))
except Exception:
full_details = None
try:
full_details = _search_client.get_track_details(best.id)
except Exception:
pass
album_data = full_details.get('album', {}) if full_details else {}
return {
'artist': artist,
'album': best.album if hasattr(best, 'album') else '',
'title': best.name,
'track_number': full_details.get('track_number') if full_details else None,
'disc_number': full_details.get('disc_number', 1) if full_details else 1,
'year': (album_data.get('release_date', '') or '')[:4] if isinstance(album_data, dict) else '',
'image_url': getattr(best, 'image_url', '') or '',
'source': 'title_search',
'confidence': round(best_score, 3),
}
except Exception as e:
logger.debug(f"Title search failed for '{title}': {e}")
# Rate limit courtesy
if context.sleep_or_stop(0.2):
return None
corrected = self._build_corrected_metadata(
full_details or best,
fallback_title=title,
source=f"{source}_title_search",
confidence=round(best_score, 3),
)
if corrected:
return corrected
except Exception as e:
logger.debug(f"Title search failed for '{title}' via {source}: {e}")
# Rate limit courtesy
if context.sleep_or_stop(0.2):
return None
return None
@staticmethod
def _get_track_value(payload, key, default=None):
if isinstance(payload, dict):
return payload.get(key, default)
return getattr(payload, key, default)
def _iter_source_track_ids(self, track: dict, primary_source: str):
source_fields = {
'spotify': 'spotify_track_id',
'deezer': 'deezer_track_id',
'itunes': 'itunes_track_id',
}
ordered_sources = [source for source in self._iter_source_priority(primary_source, _TRACK_ID_SOURCES) if source in source_fields]
for source in ordered_sources:
source_id = track.get(source_fields[source])
if source_id:
yield source, source_id
@staticmethod
def _iter_source_priority(primary_source: str, allowed_sources: set[str]):
return [source for source in get_source_priority(primary_source) if source in allowed_sources]
def _pick_best_track_candidate(self, title: str, album_title: str, results):
from difflib import SequenceMatcher
best = None
best_score = 0.0
title_lower = title.lower()
album_lower = album_title.lower() if album_title else ''
for candidate in results:
candidate_name = self._get_track_value(candidate, 'name', '') or ''
if not candidate_name:
continue
name_sim = SequenceMatcher(None, title_lower, candidate_name.lower()).ratio()
candidate_album = self._get_track_value(candidate, 'album', '') or ''
if album_lower and candidate_album:
if isinstance(candidate_album, dict):
candidate_album = candidate_album.get('name') or candidate_album.get('title') or ''
album_sim = SequenceMatcher(None, album_lower, str(candidate_album).lower()).ratio()
name_sim = (name_sim * 0.7) + (album_sim * 0.3)
if name_sim > best_score:
best_score = name_sim
best = candidate
return best, best_score
def _build_corrected_metadata(self, payload, fallback_title: str, source: str, confidence: float):
if not payload:
return None
artist = self._get_track_value(payload, 'primary_artist', '') or ''
artists = self._get_track_value(payload, 'artists', []) or []
if not artist and artists:
if isinstance(artists, list):
first_artist = artists[0]
if isinstance(first_artist, dict):
artist = first_artist.get('name', '')
else:
artist = str(first_artist)
artist = (artist or '').strip()
if not artist or artist.lower() in _UNKNOWN_NAMES:
return None
album = self._get_track_value(payload, 'album', {}) or {}
if isinstance(album, dict):
album_name = album.get('name', '') or album.get('title', '') or ''
year = (album.get('release_date', '') or '')[:4]
image_url = ''
images = album.get('images') or []
if images:
first_image = images[0]
if isinstance(first_image, dict):
image_url = first_image.get('url', '') or ''
else:
album_name = str(album)
year = ''
image_url = ''
image_url = self._get_track_value(payload, 'image_url', image_url) or image_url
title = self._get_track_value(payload, 'name', fallback_title) or fallback_title
return {
'artist': artist,
'album': album_name,
'title': title,
'track_number': self._get_track_value(payload, 'track_number'),
'disc_number': self._get_track_value(payload, 'disc_number', 1) or 1,
'year': year,
'image_url': image_url,
'source': source,
'confidence': confidence,
}
def _apply_fix(self, context, track, corrected, resolved_path,
expected_rel, transfer, fix_tags, reorganize_files):
"""Apply the fix: re-tag file, move to correct path, update DB."""

@ -0,0 +1,238 @@
import sys
import types
from types import SimpleNamespace
if "spotipy" not in sys.modules:
spotipy = types.ModuleType("spotipy")
class _DummySpotify:
def __init__(self, *args, **kwargs):
pass
oauth2 = types.ModuleType("spotipy.oauth2")
class _DummyOAuth:
def __init__(self, *args, **kwargs):
pass
spotipy.Spotify = _DummySpotify
oauth2.SpotifyOAuth = _DummyOAuth
oauth2.SpotifyClientCredentials = _DummyOAuth
spotipy.oauth2 = oauth2
sys.modules["spotipy"] = spotipy
sys.modules["spotipy.oauth2"] = oauth2
if "config.settings" not in sys.modules:
config_pkg = types.ModuleType("config")
settings_mod = types.ModuleType("config.settings")
class _DummyConfigManager:
def get(self, key, default=None):
return default
settings_mod.config_manager = _DummyConfigManager()
config_pkg.settings = settings_mod
sys.modules["config"] = config_pkg
sys.modules["config.settings"] = settings_mod
from core.repair_jobs.unknown_artist_fixer import UnknownArtistFixerJob
import core.repair_jobs.unknown_artist_fixer as unknown_artist_fixer_module
class _FakeClient:
def __init__(self, track_details=None, search_results=None):
self.track_details = track_details or {}
self.search_results = search_results or {}
self.get_calls = []
self.search_calls = []
def get_track_details(self, track_id):
self.get_calls.append(track_id)
return self.track_details.get(track_id)
def search_tracks(self, query, limit=5):
self.search_calls.append((query, limit))
return self.search_results.get(query, [])
def _install_tag_reader(monkeypatch, tags=None):
fake_module = types.ModuleType("core.tag_writer")
fake_module.read_file_tags = lambda path: tags or {}
monkeypatch.setitem(sys.modules, "core.tag_writer", fake_module)
def test_unknown_artist_fixer_uses_primary_source_track_id_first(monkeypatch):
job = UnknownArtistFixerJob()
_install_tag_reader(monkeypatch)
deezer_client = _FakeClient(
track_details={
"dz-1": {
"primary_artist": "Deezer Artist",
"album": {
"name": "Deezer Album",
"release_date": "2024-02-01",
"images": [{"url": "https://img/deezer"}],
},
"name": "Deezer Song",
"track_number": 7,
"disc_number": 1,
}
}
)
spotify_client = _FakeClient(
track_details={
"sp-1": {
"primary_artist": "Spotify Artist",
"album": {
"name": "Spotify Album",
"release_date": "2023-01-01",
"images": [{"url": "https://img/spotify"}],
},
"name": "Spotify Song",
"track_number": 1,
"disc_number": 1,
}
}
)
monkeypatch.setattr(unknown_artist_fixer_module, "get_primary_source", lambda: "deezer")
monkeypatch.setattr(
unknown_artist_fixer_module,
"get_client_for_source",
lambda source: {"deezer": deezer_client, "spotify": spotify_client}.get(source),
)
track = {
"title": "Unknown Title",
"album_title": "Unknown Album",
"spotify_track_id": "sp-1",
"deezer_track_id": "dz-1",
"itunes_track_id": "",
}
result = job._resolve_metadata(SimpleNamespace(), track, "/tmp/track.flac")
assert result["artist"] == "Deezer Artist"
assert result["album"] == "Deezer Album"
assert result["source"] == "deezer_track_id_lookup"
assert deezer_client.get_calls == ["dz-1"]
assert spotify_client.get_calls == []
def test_unknown_artist_fixer_searches_primary_source_first(monkeypatch):
job = UnknownArtistFixerJob()
_install_tag_reader(monkeypatch)
candidate = SimpleNamespace(
id="dz-song-1",
name="Matching Title",
album="Matching Album",
artists=["Deezer Artist"],
image_url="https://img/deezer-search",
)
deezer_client = _FakeClient(
track_details={
"dz-song-1": {
"primary_artist": "Deezer Artist",
"album": {
"name": "Matching Album",
"release_date": "2024-03-02",
"images": [{"url": "https://img/deezer-full"}],
},
"name": "Matching Title",
"track_number": 4,
"disc_number": 1,
}
},
search_results={"Matching Title": [candidate]},
)
spotify_client = _FakeClient(
search_results={
"Matching Title": [
SimpleNamespace(
id="sp-song-1",
name="Matching Title",
album="Matching Album",
artists=["Spotify Artist"],
image_url="https://img/spotify-search",
)
]
}
)
monkeypatch.setattr(unknown_artist_fixer_module, "get_primary_source", lambda: "deezer")
monkeypatch.setattr(
unknown_artist_fixer_module,
"get_client_for_source",
lambda source: {"deezer": deezer_client, "spotify": spotify_client}.get(source),
)
track = {
"title": "Matching Title",
"album_title": "Matching Album",
"spotify_track_id": "",
"deezer_track_id": "",
"itunes_track_id": "",
}
result = job._resolve_metadata(SimpleNamespace(sleep_or_stop=lambda seconds: False), track, "/tmp/track.flac")
assert result["artist"] == "Deezer Artist"
assert result["album"] == "Matching Album"
assert result["source"] == "deezer_title_search"
assert deezer_client.search_calls == [("Matching Title", 5)]
assert spotify_client.search_calls == []
def test_unknown_artist_fixer_supports_hydrabase_title_search(monkeypatch):
job = UnknownArtistFixerJob()
_install_tag_reader(monkeypatch)
hydrabase_candidate = SimpleNamespace(
id="hy-song-1",
name="Hydra Match",
album="Hydra Album",
artists=["Hydra Artist"],
image_url="https://img/hydra-search",
)
hydrabase_client = _FakeClient(
track_details={
"hy-song-1": {
"primary_artist": "Hydra Artist",
"album": {
"name": "Hydra Album",
"release_date": "2024-04-03",
"images": [{"url": "https://img/hydra-full"}],
},
"name": "Hydra Match",
"track_number": 2,
"disc_number": 1,
}
},
search_results={"Hydra Match": [hydrabase_candidate]},
)
spotify_client = _FakeClient()
monkeypatch.setattr(unknown_artist_fixer_module, "get_primary_source", lambda: "hydrabase")
monkeypatch.setattr(
unknown_artist_fixer_module,
"get_client_for_source",
lambda source: {"hydrabase": hydrabase_client, "spotify": spotify_client}.get(source),
)
track = {
"title": "Hydra Match",
"album_title": "Hydra Album",
"spotify_track_id": "",
"deezer_track_id": "",
"itunes_track_id": "",
}
result = job._resolve_metadata(SimpleNamespace(sleep_or_stop=lambda seconds: False), track, "/tmp/track.flac")
assert result["artist"] == "Hydra Artist"
assert result["source"] == "hydrabase_title_search"
assert hydrabase_client.search_calls == [("Hydra Match", 5)]
assert spotify_client.search_calls == []
Loading…
Cancel
Save