Respect source preference in cover art repair job

Cover art lookup now honors an explicit prefer_source first,
falls back to the runtime primary metadata source when unset,
and uses the shared source priority for the remaining fallbacks.
pull/301/head
Antti Kettunen 4 weeks ago
parent e7faa9f02f
commit 6df6ecb560

@ -1,5 +1,6 @@
"""Missing Cover Art Filler Job — finds albums without artwork and locates art from APIs."""
from core.metadata_service import get_client_for_source, get_primary_source, get_source_priority
from core.repair_jobs import register_job
from core.repair_jobs.base import JobContext, JobResult, RepairJob
from utils.logging_config import get_logger
@ -11,29 +12,36 @@ logger = get_logger("repair_job.cover_art")
class MissingCoverArtJob(RepairJob):
job_id = 'missing_cover_art'
display_name = 'Cover Art Filler'
description = 'Finds albums missing artwork and locates art from Spotify/iTunes'
description = 'Finds albums missing artwork and locates art from metadata sources'
help_text = (
'Scans your library for albums that have no cover art stored in the database. '
'For each missing cover, it searches Spotify and iTunes APIs using the album name '
'and artist to find matching artwork.\n\n'
'For each missing cover, it searches the configured metadata sources using the '
'album name and artist to find matching artwork. If Prefer Source is set, that '
'source is tried first; otherwise the primary metadata source is used.\n\n'
'When artwork is found, a finding is created with the image URL so you can review '
'and apply it. The job does not download or embed artwork automatically.\n\n'
'Settings:\n'
'- Prefer Source: Which API to try first for artwork (spotify or itunes)'
'- Prefer Source: Optional source to try first; otherwise the primary metadata source is used'
)
icon = 'repair-icon-coverart'
default_enabled = True
default_interval_hours = 48
default_settings = {
'prefer_source': 'spotify',
}
default_settings = {}
auto_fix = False
def scan(self, context: JobContext) -> JobResult:
result = JobResult()
settings = self._get_settings(context)
prefer_source = settings.get('prefer_source', 'spotify')
primary_source = get_primary_source()
source_priority = get_source_priority(primary_source)
prefer_source = settings.get('prefer_source')
if prefer_source and prefer_source != primary_source and prefer_source in source_priority:
source_priority.remove(prefer_source)
source_priority.insert(0, prefer_source)
if primary_source in source_priority:
source_priority.remove(primary_source)
source_priority.insert(1, primary_source)
# Fetch albums with missing artwork
albums = []
@ -41,9 +49,31 @@ class MissingCoverArtJob(RepairJob):
try:
conn = context.db._get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT al.id, al.title, ar.name, al.spotify_album_id, al.thumb_url,
ar.thumb_url
cursor.execute("PRAGMA table_info(albums)")
album_columns = {column[1] for column in cursor.fetchall()}
select_cols = [
"al.id",
"al.title",
"ar.name",
"al.spotify_album_id",
"al.thumb_url",
"ar.thumb_url",
]
column_map = [
("itunes_album_id", "al.itunes_album_id"),
("deezer_album_id", "al.deezer_id"),
("discogs_album_id", "al.discogs_id"),
("hydrabase_album_id", "al.soul_id"),
]
column_index = {}
for alias, column in column_map:
if column.split('.', 1)[1] in album_columns:
column_index[alias] = len(select_cols)
select_cols.append(f"{column} AS {alias}")
cursor.execute(f"""
SELECT {', '.join(select_cols)}
FROM albums al
LEFT JOIN artists ar ON ar.id = al.artist_id
WHERE (al.thumb_url IS NULL OR al.thumb_url = '')
@ -67,15 +97,20 @@ class MissingCoverArtJob(RepairJob):
if context.report_progress:
context.report_progress(phase=f'Searching artwork for {total} albums...', total=total)
spotify_skipped = False # Track if we've logged the rate limit skip
for i, row in enumerate(albums):
if context.check_stop():
return result
if i % 10 == 0 and context.wait_if_paused():
return result
album_id, title, artist_name, spotify_album_id, _, artist_thumb = row
album_id, title, artist_name, spotify_album_id, _, artist_thumb = row[:6]
source_album_ids = {
'spotify': spotify_album_id,
'itunes': row[column_index['itunes_album_id']] if 'itunes_album_id' in column_index else None,
'deezer': row[column_index['deezer_album_id']] if 'deezer_album_id' in column_index else None,
'discogs': row[column_index['discogs_album_id']] if 'discogs_album_id' in column_index else None,
'hydrabase': row[column_index['hydrabase_album_id']] if 'hydrabase_album_id' in column_index else None,
}
result.scanned += 1
if context.report_progress:
@ -88,24 +123,11 @@ class MissingCoverArtJob(RepairJob):
artwork_url = None
# Check Spotify rate limit at the top level — skip Spotify entirely if banned
spotify_available = not context.is_spotify_rate_limited()
if not spotify_available and not spotify_skipped:
logger.info("Spotify rate limited — skipping Spotify artwork lookups, using fallback only")
spotify_skipped = True
# Try to find artwork URL from APIs
if prefer_source == 'spotify' and spotify_available:
artwork_url = self._try_spotify(spotify_album_id, title, artist_name, context)
if not artwork_url:
artwork_url = self._try_itunes(title, artist_name, context)
elif prefer_source == 'spotify' and not spotify_available:
# Spotify preferred but rate limited — use fallback only
artwork_url = self._try_itunes(title, artist_name, context)
else:
artwork_url = self._try_itunes(title, artist_name, context)
if not artwork_url and spotify_available:
artwork_url = self._try_spotify(spotify_album_id, title, artist_name, context)
# Try source-specific IDs first, then title/artist search, in priority order.
for source in source_priority:
artwork_url = self._try_source(source, source_album_ids.get(source), title, artist_name)
if artwork_url:
break
if artwork_url:
if context.report_progress:
@ -151,47 +173,63 @@ class MissingCoverArtJob(RepairJob):
result.scanned, result.findings_created, result.skipped)
return result
def _try_spotify(self, spotify_album_id, title, artist_name, context):
"""Try to get album art from Spotify."""
client = context.spotify_client
def _try_source(self, source, source_album_id, title, artist_name):
"""Try to get album art from a specific metadata source."""
client = get_client_for_source(source)
if not client:
return None
query = f"{artist_name} {title}" if artist_name else title
try:
if context.is_spotify_rate_limited():
return None
# If we have a Spotify album ID, fetch directly
if spotify_album_id and client.is_spotify_authenticated():
album_data = client.get_album(spotify_album_id)
if album_data:
images = album_data.get('images', [])
if images:
return images[0].get('url')
# Search by name
if title and client.is_spotify_authenticated():
query = f"{artist_name} {title}" if artist_name else title
if source_album_id:
album_data = self._get_album_for_source(source, client, source_album_id)
artwork_url = self._extract_artwork_url(album_data)
if artwork_url:
return artwork_url
if query and hasattr(client, 'search_albums'):
results = client.search_albums(query, limit=1)
if results and hasattr(results[0], 'image_url') and results[0].image_url:
return results[0].image_url
if results:
artwork_url = self._extract_artwork_url(results[0])
if artwork_url:
return artwork_url
candidate_id = self._extract_album_id(results[0])
if candidate_id:
album_data = self._get_album_for_source(source, client, candidate_id)
return self._extract_artwork_url(album_data)
except Exception as e:
logger.debug("Spotify art lookup failed for '%s': %s", title, e)
logger.debug("%s art lookup failed for '%s': %s", source.capitalize(), title, e)
return None
def _try_itunes(self, title, artist_name, context):
"""Try to get album art from iTunes."""
client = context.itunes_client
if not client:
return None
@staticmethod
def _get_album_for_source(source, client, album_id):
if source == 'spotify':
return client.get_album(album_id)
return client.get_album(album_id, include_tracks=False)
try:
query = f"{artist_name} {title}" if artist_name else title
results = client.search_albums(query, limit=1)
if results and hasattr(results[0], 'image_url') and results[0].image_url:
return results[0].image_url
except Exception as e:
logger.debug("iTunes art lookup failed for '%s': %s", title, e)
@staticmethod
def _extract_album_id(item):
if hasattr(item, 'id'):
return getattr(item, 'id', None)
if isinstance(item, dict):
return item.get('id')
return None
@staticmethod
def _extract_artwork_url(item):
if not item:
return None
if hasattr(item, 'image_url') and getattr(item, 'image_url', None):
return item.image_url
if isinstance(item, dict):
if item.get('image_url'):
return item['image_url']
images = item.get('images') or []
if images and isinstance(images, list):
first = images[0]
if isinstance(first, dict):
return first.get('url')
return None
def _get_settings(self, context: JobContext) -> dict:

@ -0,0 +1,167 @@
import sqlite3
import sys
import types
from types import SimpleNamespace
# Stub optional Spotify dependency so metadata_service can import in tests.
if 'spotipy' not in sys.modules:
spotipy = types.ModuleType('spotipy')
oauth2 = types.ModuleType('spotipy.oauth2')
class _DummySpotify:
pass
class _DummyOAuth:
pass
spotipy.Spotify = _DummySpotify
oauth2.SpotifyOAuth = _DummyOAuth
oauth2.SpotifyClientCredentials = _DummyOAuth
spotipy.oauth2 = oauth2
sys.modules['spotipy'] = spotipy
sys.modules['spotipy.oauth2'] = oauth2
if 'config.settings' not in sys.modules:
config_mod = types.ModuleType('config')
settings_mod = types.ModuleType('config.settings')
class _DummyConfigManager:
def get(self, key, default=None):
return default
settings_mod.config_manager = _DummyConfigManager()
config_mod.settings = settings_mod
sys.modules['config'] = config_mod
sys.modules['config.settings'] = settings_mod
from core.repair_jobs import missing_cover_art as mca
class _FakeClient:
def __init__(self, album_image=None, search_image=None):
self.album_image = album_image
self.search_image = search_image
self.get_album_calls = []
self.search_calls = []
def get_album(self, album_id, include_tracks=False):
self.get_album_calls.append((album_id, include_tracks))
if self.album_image is None:
return None
if isinstance(self.album_image, str):
return {'images': [{'url': self.album_image}]}
return self.album_image
def search_albums(self, query, limit=1):
self.search_calls.append((query, limit))
if self.search_image is None:
return []
return [SimpleNamespace(id='search-album', image_url=self.search_image)]
def _make_db(album_row):
conn = sqlite3.connect(':memory:')
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute(
"""
CREATE TABLE artists (
id INTEGER PRIMARY KEY,
name TEXT,
thumb_url TEXT
)
"""
)
cursor.execute(
"""
CREATE TABLE albums (
id INTEGER PRIMARY KEY,
title TEXT,
artist_id INTEGER,
thumb_url TEXT,
spotify_album_id TEXT,
itunes_album_id TEXT,
deezer_id TEXT,
discogs_id TEXT,
soul_id TEXT
)
"""
)
cursor.execute(
"INSERT INTO artists (id, name, thumb_url) VALUES (?, ?, ?)",
(1, 'Artist', 'https://artist/thumb'),
)
cursor.execute(
"""
INSERT INTO albums
(id, title, artist_id, thumb_url, spotify_album_id, itunes_album_id, deezer_id, discogs_id, soul_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
album_row,
)
conn.commit()
return conn
def _make_context(conn, prefer_source=None):
job_settings = {}
if prefer_source is not None:
job_settings['prefer_source'] = prefer_source
settings = {'repair.jobs.missing_cover_art.settings': job_settings}
findings = []
return SimpleNamespace(
db=SimpleNamespace(_get_connection=lambda: conn),
config_manager=SimpleNamespace(get=lambda key, default=None: settings.get(key, default)),
check_stop=lambda: False,
wait_if_paused=lambda: False,
update_progress=lambda *args, **kwargs: None,
report_progress=lambda *args, **kwargs: None,
create_finding=lambda **kwargs: findings.append(kwargs),
findings=findings,
)
def test_missing_cover_art_prefers_explicit_source_over_primary(monkeypatch):
conn = _make_db((1, 'Album', 1, '', 'sp-album', 'it-album', 'dz-album', 'dg-album', 'hy-album'))
context = _make_context(conn, prefer_source='spotify')
deezer_client = _FakeClient(album_image='https://img/deezer-direct')
spotify_client = _FakeClient(album_image='https://img/spotify-direct')
monkeypatch.setattr(mca, 'get_primary_source', lambda: 'deezer')
monkeypatch.setattr(
mca,
'get_client_for_source',
lambda source: {'deezer': deezer_client, 'spotify': spotify_client}.get(source),
)
result = mca.MissingCoverArtJob().scan(context)
assert result.findings_created == 1
assert spotify_client.get_album_calls == [('sp-album', False)]
assert deezer_client.get_album_calls == []
assert context.findings[0]['details']['found_artwork_url'] == 'https://img/spotify-direct'
def test_missing_cover_art_uses_primary_when_prefer_unset(monkeypatch):
conn = _make_db((1, 'Album', 1, '', None, None, None, None, None))
context = _make_context(conn)
discogs_client = _FakeClient(search_image='https://img/discogs-search')
spotify_client = _FakeClient(search_image='https://img/spotify-search')
itunes_client = _FakeClient(search_image='https://img/itunes-search')
monkeypatch.setattr(mca, 'get_primary_source', lambda: 'discogs')
monkeypatch.setattr(
mca,
'get_client_for_source',
lambda source: {'discogs': discogs_client, 'spotify': spotify_client, 'itunes': itunes_client}.get(source),
)
result = mca.MissingCoverArtJob().scan(context)
assert result.findings_created == 1
assert discogs_client.search_calls == [('Artist Album', 1)]
assert spotify_client.search_calls == []
assert itunes_client.search_calls == []
assert context.findings[0]['details']['found_artwork_url'] == 'https://img/discogs-search'
Loading…
Cancel
Save