Refactor artist discography completion metadata flow

Move completion checks into metadata_service and make them follow the configured metadata source priority.

Drop the old test-mode path, remove the web_server wrapper indirection, and keep artist inference on explicit release metadata instead of guessing from a track search.

Add coverage for the source-priority completion behavior and the safer artist-name handling.
pull/324/head
Antti Kettunen 4 weeks ago
parent abb08efe74
commit 17865fe712
No known key found for this signature in database
GPG Key ID: C6B2A3D250359BD7

@ -294,6 +294,7 @@ def _build_discography_release_dict(release: Any, artist_id: str) -> Optional[Di
return {
'id': release_id,
'name': _extract_lookup_value(release, 'name', 'title', default=release_id),
'artist_name': _extract_release_artist_name(release),
'release_date': release_date,
'album_type': album_type,
'image_url': _extract_lookup_value(release, 'image_url', 'thumb_url', 'cover_image'),
@ -302,6 +303,34 @@ def _build_discography_release_dict(release: Any, artist_id: str) -> Optional[Di
}
def _extract_release_artist_name(release: Any) -> str:
artist_name = _extract_lookup_value(release, 'artist_name', 'artist', default='') or ''
artist_name = str(artist_name).strip()
if artist_name:
return artist_name
artists = _extract_lookup_value(release, 'artists', default=[]) or []
if isinstance(artists, (str, bytes)):
return str(artists).strip()
if isinstance(artists, dict):
return str(_extract_lookup_value(artists, 'name', 'artist_name', 'title', default='') or '').strip()
try:
artists = list(artists)
except TypeError:
artists = [artists]
if not artists:
return ''
first_artist = artists[0]
inferred_name = _extract_lookup_value(first_artist, 'name', 'artist_name', 'title')
if not inferred_name and isinstance(first_artist, str):
inferred_name = first_artist
return str(inferred_name).strip() if inferred_name else ''
def _sort_discography_releases(releases: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
def get_release_year(item):
if item.get('release_date'):
@ -394,6 +423,387 @@ def get_artist_discography(
}
def _get_completion_source_chain(source_override: Optional[str] = None) -> List[str]:
primary_source = get_primary_source()
source_chain = list(get_source_priority(primary_source))
override = (source_override or '').strip().lower()
if override:
source_chain = [override] + [source for source in source_chain if source != override]
return source_chain
def _extract_track_items(api_tracks: Any) -> List[Dict[str, Any]]:
if not api_tracks:
return []
if isinstance(api_tracks, dict):
return api_tracks.get('items') or []
if isinstance(api_tracks, list):
return api_tracks
return []
def _resolve_completion_artist_name(
discography: Dict[str, Any],
artist_name: str,
) -> str:
resolved_name = (artist_name or '').strip()
if resolved_name and resolved_name.lower() != 'unknown artist':
return resolved_name
release_items = list((discography or {}).get('albums', []) or []) + list((discography or {}).get('singles', []) or [])
if not release_items:
return resolved_name or 'Unknown Artist'
release_artist_name = _extract_release_artist_name(release_items[0])
if release_artist_name:
logger.debug("Using release artist metadata '%s' for completion", release_artist_name)
return release_artist_name
return resolved_name or 'Unknown Artist'
def _resolve_completion_track_total(release: Dict[str, Any], source_chain: List[str]) -> int:
total_tracks = _extract_lookup_value(release, 'total_tracks', default=0) or 0
if total_tracks:
return int(total_tracks)
release_id = _extract_lookup_value(release, 'id', 'album_id', 'release_id')
if not release_id:
return 0
for source in source_chain:
try:
api_tracks = get_album_tracks_for_source(source, str(release_id))
items = _extract_track_items(api_tracks)
if items:
logger.debug("Resolved track count for release %s from %s", release_id, source)
return len(items)
except Exception as exc:
logger.debug("Could not resolve track count for release %s from %s: %s", release_id, source, exc)
return 0
def check_album_completion(
db,
album_data: Dict[str, Any],
artist_name: str,
source_override: Optional[str] = None,
source_chain: Optional[List[str]] = None,
) -> Dict[str, Any]:
"""Check completion status for a single album."""
try:
source_chain = source_chain or _get_completion_source_chain(source_override)
album_name = album_data.get('name', '')
total_tracks = _resolve_completion_track_total(album_data, source_chain)
album_id = album_data.get('id', '')
# If total_tracks is 0 (Discogs masters don't include track counts),
# try to fetch the real count from the prioritized metadata sources.
if total_tracks == 0 and album_id:
logger.debug("No track count found for '%s' (%s)", album_name, album_id)
print(f"Checking album: '{album_name}' ({total_tracks} tracks)")
formats = []
# Check if album exists in database with completeness info
try:
from config.settings import config_manager
active_server = config_manager.get_active_media_server()
db_album, confidence, owned_tracks, expected_tracks, is_complete, formats = db.check_album_exists_with_completeness(
title=album_name,
artist=artist_name,
expected_track_count=total_tracks if total_tracks > 0 else None,
confidence_threshold=0.7,
server_source=active_server
)
except Exception as db_error:
print(f"Database error for album '{album_name}': {db_error}")
return {
"id": album_id,
"name": album_name,
"status": "error",
"owned_tracks": 0,
"expected_tracks": total_tracks,
"completion_percentage": 0,
"confidence": 0.0,
"found_in_db": False,
"error_message": str(db_error),
"formats": []
}
if expected_tracks > 0:
completion_percentage = (owned_tracks / expected_tracks) * 100
elif total_tracks > 0:
completion_percentage = (owned_tracks / total_tracks) * 100
else:
completion_percentage = 100 if owned_tracks > 0 else 0
if owned_tracks > 0 and owned_tracks >= (expected_tracks or total_tracks):
status = "completed"
elif owned_tracks > 0:
status = "partial"
else:
status = "missing"
print(f" Result: {owned_tracks}/{expected_tracks or total_tracks} tracks ({completion_percentage:.1f}%) - {status}")
return {
"id": album_id,
"name": album_name,
"status": status,
"owned_tracks": owned_tracks,
"expected_tracks": expected_tracks or total_tracks,
"completion_percentage": round(completion_percentage, 1),
"confidence": round(confidence, 2) if confidence else 0.0,
"found_in_db": db_album is not None,
"formats": formats
}
except Exception as e:
print(f"Error checking album completion for '{album_data.get('name', 'Unknown')}': {e}")
return {
"id": album_data.get('id', ''),
"name": album_data.get('name', 'Unknown'),
"status": "error",
"owned_tracks": 0,
"expected_tracks": album_data.get('total_tracks', 0),
"completion_percentage": 0,
"confidence": 0.0,
"found_in_db": False,
"formats": []
}
def check_single_completion(
db,
single_data: Dict[str, Any],
artist_name: str,
source_override: Optional[str] = None,
source_chain: Optional[List[str]] = None,
) -> Dict[str, Any]:
"""Check completion status for a single/EP."""
try:
source_chain = source_chain or _get_completion_source_chain(source_override)
single_name = single_data.get('name', '')
raw_total_tracks = single_data.get('total_tracks', 1)
total_tracks = raw_total_tracks if raw_total_tracks is not None else 1
single_id = single_data.get('id', '')
album_type = single_data.get('album_type', 'single')
formats = []
if total_tracks == 0:
total_tracks = _resolve_completion_track_total(single_data, source_chain) or 1
print(f"Checking {album_type}: '{single_name}' ({total_tracks} tracks)")
if album_type == 'ep' or total_tracks > 1:
try:
from config.settings import config_manager
active_server = config_manager.get_active_media_server()
db_album, confidence, owned_tracks, expected_tracks, is_complete, formats = db.check_album_exists_with_completeness(
title=single_name,
artist=artist_name,
expected_track_count=total_tracks,
confidence_threshold=0.7,
server_source=active_server
)
except Exception as db_error:
print(f"Database error for EP '{single_name}': {db_error}")
owned_tracks, expected_tracks, confidence = 0, total_tracks, 0.0
db_album = None
if expected_tracks > 0:
completion_percentage = (owned_tracks / expected_tracks) * 100
else:
completion_percentage = (owned_tracks / total_tracks) * 100
if owned_tracks > 0 and owned_tracks >= (expected_tracks or total_tracks):
status = "completed"
elif owned_tracks > 0:
status = "partial"
else:
status = "missing"
print(f" EP Result: {owned_tracks}/{expected_tracks or total_tracks} tracks ({completion_percentage:.1f}%) - {status}")
return {
"id": single_id,
"name": single_name,
"status": status,
"owned_tracks": owned_tracks,
"expected_tracks": expected_tracks or total_tracks,
"completion_percentage": round(completion_percentage, 1),
"confidence": round(confidence, 2) if confidence else 0.0,
"found_in_db": db_album is not None,
"type": album_type,
"formats": formats
}
else:
try:
from config.settings import config_manager
active_server = config_manager.get_active_media_server()
db_track, confidence = db.check_track_exists(
title=single_name,
artist=artist_name,
confidence_threshold=0.7,
server_source=active_server
)
except Exception as db_error:
print(f"Database error for single '{single_name}': {db_error}")
db_track, confidence = None, 0.0
owned_tracks = 1 if db_track else 0
expected_tracks = 1
completion_percentage = 100 if db_track else 0
status = "completed" if db_track else "missing"
if db_track and db_track.file_path:
import os
ext = os.path.splitext(db_track.file_path)[1].lstrip('.').upper()
if ext == 'MP3' and db_track.bitrate:
formats = [f"MP3-{db_track.bitrate}"]
elif ext:
formats = [ext]
print(f" Single Result: {owned_tracks}/1 tracks ({completion_percentage:.1f}%) - {status}")
return {
"id": single_id,
"name": single_name,
"status": status,
"owned_tracks": owned_tracks,
"expected_tracks": expected_tracks,
"completion_percentage": round(completion_percentage, 1),
"confidence": round(confidence, 2) if confidence else 0.0,
"found_in_db": db_track is not None,
"type": album_type,
"formats": formats
}
except Exception as e:
print(f"Error checking single/EP completion for '{single_data.get('name', 'Unknown')}': {e}")
return {
"id": single_data.get('id', ''),
"name": single_data.get('name', 'Unknown'),
"status": "error",
"owned_tracks": 0,
"expected_tracks": single_data.get('total_tracks', 1),
"completion_percentage": 0,
"confidence": 0.0,
"found_in_db": False,
"type": single_data.get('album_type', 'single'),
"formats": []
}
def iter_artist_discography_completion_events(
discography: Dict[str, Any],
artist_name: str = 'Unknown Artist',
source_override: Optional[str] = None,
db=None,
):
"""Yield completion-stream events for artist discography ownership checks."""
if db is None:
from database.music_database import get_database
db = get_database()
source_chain = _get_completion_source_chain(source_override)
resolved_artist_name = _resolve_completion_artist_name(discography or {}, artist_name)
albums = list((discography or {}).get('albums', []) or [])
singles = list((discography or {}).get('singles', []) or [])
total_items = len(albums) + len(singles)
processed_count = 0
yield {
'type': 'start',
'total_items': total_items,
'artist_name': resolved_artist_name,
}
for album in albums:
try:
completion_data = check_album_completion(
db,
album,
resolved_artist_name,
source_override=source_override,
source_chain=source_chain,
)
completion_data['type'] = 'album_completion'
completion_data['container_type'] = 'albums'
processed_count += 1
completion_data['progress'] = round((processed_count / total_items) * 100, 1) if total_items else 100
yield completion_data
except Exception as e:
yield {
'type': 'error',
'container_type': 'albums',
'id': album.get('id', ''),
'name': album.get('name', 'Unknown'),
'error': str(e),
}
for single in singles:
try:
completion_data = check_single_completion(
db,
single,
resolved_artist_name,
source_override=source_override,
source_chain=source_chain,
)
completion_data['type'] = 'single_completion'
completion_data['container_type'] = 'singles'
processed_count += 1
completion_data['progress'] = round((processed_count / total_items) * 100, 1) if total_items else 100
yield completion_data
except Exception as e:
yield {
'type': 'error',
'container_type': 'singles',
'id': single.get('id', ''),
'name': single.get('name', 'Unknown'),
'error': str(e),
}
yield {
'type': 'complete',
'processed_count': processed_count,
'artist_name': resolved_artist_name,
}
def check_artist_discography_completion(
discography: Dict[str, Any],
artist_name: str = 'Unknown Artist',
source_override: Optional[str] = None,
db=None,
) -> Dict[str, Any]:
"""Return completion results for an artist discography without streaming."""
albums_completion = []
singles_completion = []
for event in iter_artist_discography_completion_events(
discography,
artist_name=artist_name,
source_override=source_override,
db=db,
):
if event.get('type') == 'album_completion':
albums_completion.append(event)
elif event.get('type') == 'single_completion':
singles_completion.append(event)
return {
'albums': albums_completion,
'singles': singles_completion,
}
def get_deezer_client():
"""Get cached Deezer client.

@ -32,6 +32,9 @@ if "config.settings" not in sys.modules:
def get(self, key, default=None):
return default
def get_active_media_server(self):
return "primary"
settings_mod.config_manager = _DummyConfigManager()
config_pkg.settings = settings_mod
sys.modules["config"] = config_pkg
@ -56,6 +59,7 @@ class _FakeSourceClient:
self.album_calls = []
self.artist_search_calls = []
self.discography_calls = []
self.track_search_calls = []
def get_artist_albums(self, artist_id, **kwargs):
self.album_calls.append((artist_id, dict(kwargs)))
@ -69,6 +73,14 @@ class _FakeSourceClient:
self.discography_calls.append((query, dict(kwargs)))
return list(self.discography_results)
def search_tracks(self, query, **kwargs):
self.track_search_calls.append((query, dict(kwargs)))
return []
def get_album_tracks(self, album_id, **kwargs):
self.album_calls.append((album_id, dict(kwargs)))
return {"items": list(self.album_results)}
def _album(album_id, name, release_date, album_type="album"):
return types.SimpleNamespace(
@ -248,3 +260,126 @@ def test_get_artist_discography_uses_hydrabase_fast_path_when_active(monkeypatch
)
]
assert hydrabase.artist_search_calls == [("Artist One", {"limit": 5})]
class _CompletionFakeDB:
def __init__(self, owned_tracks=1, expected_tracks=3, is_track=False):
self.owned_tracks = owned_tracks
self.expected_tracks = expected_tracks
self.is_track = is_track
self.album_calls = []
self.track_calls = []
def check_album_exists_with_completeness(self, **kwargs):
self.album_calls.append(dict(kwargs))
return (True, 0.9, self.owned_tracks, self.expected_tracks, self.owned_tracks >= self.expected_tracks, [])
def check_track_exists(self, **kwargs):
self.track_calls.append(dict(kwargs))
if self.is_track:
return (object(), 0.9)
return (None, 0.0)
def test_iter_artist_discography_completion_uses_primary_source_first(monkeypatch):
deezer = _FakeSourceClient()
spotify = _FakeSourceClient()
itunes = _FakeSourceClient()
deezer.album_results = [{"id": "release-1-track-1"}, {"id": "release-1-track-2"}]
spotify.album_results = [{"id": "release-1-track-1"}, {"id": "release-1-track-2"}, {"id": "release-1-track-3"}]
itunes.album_results = [{"id": "release-1-track-1"}]
clients = {
"deezer": deezer,
"spotify": spotify,
"itunes": itunes,
}
monkeypatch.setattr(metadata_service, "get_primary_source", lambda: "deezer")
monkeypatch.setattr(metadata_service, "get_source_priority", lambda primary: [primary, "spotify", "itunes"])
monkeypatch.setattr(metadata_service, "get_client_for_source", lambda source: clients.get(source))
db = _CompletionFakeDB(owned_tracks=1, expected_tracks=2)
events = list(metadata_service.iter_artist_discography_completion_events(
{
"albums": [{"id": "release-1", "name": "Album One", "total_tracks": 0}],
"singles": [],
},
artist_name="Artist One",
db=db,
))
assert events[0]["type"] == "start"
assert events[-1]["type"] == "complete"
assert events[1]["expected_tracks"] == 2
assert events[1]["status"] == "partial"
assert deezer.album_calls == [("release-1", {})]
assert spotify.album_calls == []
assert itunes.album_calls == []
assert db.album_calls and db.album_calls[0]["expected_track_count"] == 2
def test_iter_artist_discography_completion_respects_source_override(monkeypatch):
deezer = _FakeSourceClient()
spotify = _FakeSourceClient()
itunes = _FakeSourceClient()
deezer.album_results = [{"id": "release-2-track-1"}]
spotify.album_results = [{"id": "release-2-track-1"}, {"id": "release-2-track-2"}]
itunes.album_results = [{"id": "release-2-track-1"}, {"id": "release-2-track-2"}, {"id": "release-2-track-3"}]
clients = {
"deezer": deezer,
"spotify": spotify,
"itunes": itunes,
}
monkeypatch.setattr(metadata_service, "get_primary_source", lambda: "deezer")
monkeypatch.setattr(metadata_service, "get_source_priority", lambda primary: [primary, "spotify", "itunes"])
monkeypatch.setattr(metadata_service, "get_client_for_source", lambda source: clients.get(source))
db = _CompletionFakeDB(owned_tracks=1, expected_tracks=3)
events = list(metadata_service.iter_artist_discography_completion_events(
{
"albums": [{"id": "release-2", "name": "Album Two", "total_tracks": 0}],
"singles": [],
},
artist_name="Artist Two",
source_override="itunes",
db=db,
))
assert events[1]["expected_tracks"] == 3
assert itunes.album_calls == [("release-2", {})]
assert deezer.album_calls == []
assert spotify.album_calls == []
def test_iter_artist_discography_completion_uses_release_artist_metadata(monkeypatch):
source = _FakeSourceClient()
clients = {"deezer": source}
monkeypatch.setattr(metadata_service, "get_primary_source", lambda: "deezer")
monkeypatch.setattr(metadata_service, "get_source_priority", lambda primary: [primary])
monkeypatch.setattr(metadata_service, "get_client_for_source", lambda source_name: clients.get(source_name))
db = _CompletionFakeDB(owned_tracks=1, expected_tracks=2)
events = list(metadata_service.iter_artist_discography_completion_events(
{
"albums": [{
"id": "release-3",
"name": "Album Three",
"artist_name": "Explicit Artist",
"total_tracks": 2,
}],
"singles": [],
},
artist_name="Unknown Artist",
db=db,
))
assert events[0]["artist_name"] == "Explicit Artist"
assert events[1]["name"] == "Album Three"
assert db.album_calls[0]["artist"] == "Explicit Artist"
assert source.track_search_calls == []

@ -11701,276 +11701,22 @@ def check_artist_discography_completion(artist_id):
data = request.get_json()
if not data or 'discography' not in data:
return jsonify({"error": "Missing discography data"}), 400
from core.metadata_service import check_artist_discography_completion as _check_artist_discography_completion
discography = data['discography']
test_mode = data.get('test_mode', False) # Add test mode for demonstration
albums_completion = []
singles_completion = []
# Get database instance
from database.music_database import MusicDatabase
db = MusicDatabase()
# Get artist name - should be provided by the frontend
artist_name = data.get('artist_name', 'Unknown Artist')
# If no artist name provided, try to infer it from the request
if artist_name == 'Unknown Artist':
print(f"No artist name provided in request, attempting to infer from discography data")
# Try to extract from first album's title by using a simple search
all_items = discography.get('albums', []) + discography.get('singles', [])
if all_items and spotify_client and spotify_client.is_authenticated():
try:
first_item = all_items[0]
# Search for the first track to get artist name
search_results = spotify_client.search_tracks(first_item.get('name', ''), limit=1)
if search_results and len(search_results) > 0:
artist_name = search_results[0].artists[0] if search_results[0].artists else "Unknown Artist"
print(f"Inferred artist name from search: {artist_name}")
except Exception as e:
print(f"Could not infer artist name: {e}")
artist_name = "Unknown Artist"
print(f"Checking completion for artist: {artist_name}")
# Process albums
for album in discography.get('albums', []):
completion_data = _check_album_completion(db, album, artist_name, test_mode)
albums_completion.append(completion_data)
# Process singles/EPs
for single in discography.get('singles', []):
completion_data = _check_single_completion(db, single, artist_name, test_mode)
singles_completion.append(completion_data)
return jsonify({
"albums": albums_completion,
"singles": singles_completion
})
source_override = (data.get('source') or '').strip().lower() or None
result = _check_artist_discography_completion(
discography,
artist_name=data.get('artist_name', 'Unknown Artist'),
source_override=source_override,
)
return jsonify(result)
except Exception as e:
print(f"Error checking discography completion: {e}")
import traceback
traceback.print_exc()
return jsonify({"error": str(e)}), 500
def _check_album_completion(db, album_data: dict, artist_name: str, test_mode: bool = False) -> dict:
"""Check completion status for a single album"""
try:
album_name = album_data.get('name', '')
total_tracks = album_data.get('total_tracks', 0)
album_id = album_data.get('id', '')
# If total_tracks is 0 (Discogs masters don't include track counts),
# try to fetch the real count from the source
if total_tracks == 0 and album_id:
try:
fallback = _get_metadata_fallback_client()
album_detail = fallback.get_album_tracks(str(album_id))
if album_detail and album_detail.get('items'):
total_tracks = len(album_detail['items'])
logger.debug(f"Fetched track count for '{album_name}': {total_tracks}")
except Exception:
pass
print(f"Checking album: '{album_name}' ({total_tracks} tracks)")
formats = []
if test_mode:
# Generate test data to demonstrate the feature
import random
owned_tracks = random.randint(0, max(1, total_tracks))
expected_tracks = total_tracks
confidence = random.uniform(0.7, 1.0)
db_album = True # Simulate found album
print(f"TEST MODE: Simulating {owned_tracks}/{expected_tracks} tracks for '{album_name}'")
else:
# Check if album exists in database with completeness info
try:
# Get active server for database checking
active_server = config_manager.get_active_media_server()
db_album, confidence, owned_tracks, expected_tracks, is_complete, formats = db.check_album_exists_with_completeness(
title=album_name,
artist=artist_name,
expected_track_count=total_tracks if total_tracks > 0 else None,
confidence_threshold=0.7, # Slightly lower threshold for better matching
server_source=active_server # Check only the active server
)
except Exception as db_error:
print(f"Database error for album '{album_name}': {db_error}")
# Return error state for this album
return {
"id": album_id,
"name": album_name,
"status": "error",
"owned_tracks": 0,
"expected_tracks": total_tracks,
"completion_percentage": 0,
"confidence": 0.0,
"found_in_db": False,
"error_message": str(db_error),
"formats": []
}
# Calculate completion percentage
if expected_tracks > 0:
completion_percentage = (owned_tracks / expected_tracks) * 100
elif total_tracks > 0:
completion_percentage = (owned_tracks / total_tracks) * 100
else:
completion_percentage = 100 if owned_tracks > 0 else 0
# Determine completion status — exact match, no percentage rounding
if owned_tracks > 0 and owned_tracks >= (expected_tracks or total_tracks):
status = "completed"
elif owned_tracks > 0:
status = "partial"
else:
status = "missing"
print(f" Result: {owned_tracks}/{expected_tracks or total_tracks} tracks ({completion_percentage:.1f}%) - {status}")
return {
"id": album_id,
"name": album_name,
"status": status,
"owned_tracks": owned_tracks,
"expected_tracks": expected_tracks or total_tracks,
"completion_percentage": round(completion_percentage, 1),
"confidence": round(confidence, 2) if confidence else 0.0,
"found_in_db": db_album is not None,
"formats": formats
}
except Exception as e:
print(f"Error checking album completion for '{album_data.get('name', 'Unknown')}': {e}")
return {
"id": album_data.get('id', ''),
"name": album_data.get('name', 'Unknown'),
"status": "error",
"owned_tracks": 0,
"expected_tracks": album_data.get('total_tracks', 0),
"completion_percentage": 0,
"confidence": 0.0,
"found_in_db": False,
"formats": []
}
def _check_single_completion(db, single_data: dict, artist_name: str, test_mode: bool = False) -> dict:
"""Check completion status for a single/EP (treat EPs like albums, singles as single tracks)"""
try:
single_name = single_data.get('name', '')
total_tracks = single_data.get('total_tracks', 1)
single_id = single_data.get('id', '')
album_type = single_data.get('album_type', 'single')
formats = []
print(f"Checking {album_type}: '{single_name}' ({total_tracks} tracks)")
if test_mode:
# Generate test data for singles/EPs
import random
if album_type == 'ep' or total_tracks > 1:
owned_tracks = random.randint(0, total_tracks)
expected_tracks = total_tracks
confidence = random.uniform(0.7, 1.0)
print(f"TEST MODE: EP with {owned_tracks}/{expected_tracks} tracks")
else:
owned_tracks = random.choice([0, 1]) # 50/50 chance
expected_tracks = 1
confidence = random.uniform(0.7, 1.0) if owned_tracks else 0.0
print(f"TEST MODE: Single with {owned_tracks}/{expected_tracks} tracks")
elif album_type == 'ep' or total_tracks > 1:
# Treat EPs like albums
try:
# Get active server for database checking
active_server = config_manager.get_active_media_server()
db_album, confidence, owned_tracks, expected_tracks, is_complete, formats = db.check_album_exists_with_completeness(
title=single_name,
artist=artist_name,
expected_track_count=total_tracks,
confidence_threshold=0.7,
server_source=active_server # Check only the active server
)
except Exception as db_error:
print(f"Database error for EP '{single_name}': {db_error}")
owned_tracks, expected_tracks, confidence = 0, total_tracks, 0.0
# Calculate completion percentage
if expected_tracks > 0:
completion_percentage = (owned_tracks / expected_tracks) * 100
else:
completion_percentage = (owned_tracks / total_tracks) * 100
# Determine status — exact match, no percentage rounding
if owned_tracks > 0 and owned_tracks >= (expected_tracks or total_tracks):
status = "completed"
elif owned_tracks > 0:
status = "partial"
else:
status = "missing"
print(f" EP Result: {owned_tracks}/{expected_tracks or total_tracks} tracks ({completion_percentage:.1f}%) - {status}")
else:
# Single track - just check if the track exists
try:
active_server = config_manager.get_active_media_server()
db_track, confidence = db.check_track_exists(
title=single_name,
artist=artist_name,
confidence_threshold=0.7,
server_source=active_server
)
except Exception as db_error:
print(f"Database error for single '{single_name}': {db_error}")
db_track, confidence = None, 0.0
owned_tracks = 1 if db_track else 0
expected_tracks = 1
completion_percentage = 100 if db_track else 0
status = "completed" if db_track else "missing"
# Extract format from single track
if db_track and db_track.file_path:
import os
ext = os.path.splitext(db_track.file_path)[1].lstrip('.').upper()
if ext == 'MP3' and db_track.bitrate:
formats = [f"MP3-{db_track.bitrate}"]
elif ext:
formats = [ext]
print(f" Single Result: {owned_tracks}/1 tracks ({completion_percentage:.1f}%) - {status}")
return {
"id": single_id,
"name": single_name,
"status": status,
"owned_tracks": owned_tracks,
"expected_tracks": expected_tracks or total_tracks,
"completion_percentage": round(completion_percentage, 1),
"confidence": round(confidence, 2) if confidence else 0.0,
"found_in_db": (db_album if album_type == 'ep' or total_tracks > 1 else db_track) is not None,
"type": album_type,
"formats": formats
}
except Exception as e:
print(f"Error checking single/EP completion for '{single_data.get('name', 'Unknown')}': {e}")
return {
"id": single_data.get('id', ''),
"name": single_data.get('name', 'Unknown'),
"status": "error",
"owned_tracks": 0,
"expected_tracks": single_data.get('total_tracks', 1),
"completion_percentage": 0,
"confidence": 0.0,
"found_in_db": False,
"type": single_data.get('album_type', 'single'),
"formats": []
}
@app.route('/api/artist/<artist_id>/completion-stream', methods=['POST'])
def check_artist_discography_completion_stream(artist_id):
"""Stream completion status for artist's albums and singles one by one"""
@ -11984,75 +11730,22 @@ def check_artist_discography_completion_stream(artist_id):
# Extract data for the generator
discography = data['discography']
test_mode = data.get('test_mode', False)
artist_name = data.get('artist_name', 'Unknown Artist')
source_override = (data.get('source') or '').strip().lower() or None
from core.metadata_service import iter_artist_discography_completion_events
def generate_completion_stream():
try:
print(f"Starting streaming completion check for artist: {artist_name}")
# Get database instance
from database.music_database import MusicDatabase
db = MusicDatabase()
# Process albums one by one
total_items = len(discography.get('albums', [])) + len(discography.get('singles', []))
processed_count = 0
# Send initial status
yield f"data: {json.dumps({'type': 'start', 'total_items': total_items, 'artist_name': artist_name})}\n\n"
# Process albums
for album in discography.get('albums', []):
try:
completion_data = _check_album_completion(db, album, artist_name, test_mode)
completion_data['type'] = 'album_completion'
completion_data['container_type'] = 'albums'
processed_count += 1
completion_data['progress'] = round((processed_count / total_items) * 100, 1)
yield f"data: {json.dumps(completion_data)}\n\n"
for event in iter_artist_discography_completion_events(
discography,
artist_name=artist_name,
source_override=source_override,
):
yield f"data: {json.dumps(event)}\n\n"
if event.get('type') in ('album_completion', 'single_completion'):
# Small delay to make the streaming effect visible
time.sleep(0.1) # 100ms delay between items
except Exception as e:
error_data = {
'type': 'error',
'container_type': 'albums',
'id': album.get('id', ''),
'name': album.get('name', 'Unknown'),
'error': str(e)
}
yield f"data: {json.dumps(error_data)}\n\n"
# Process singles/EPs
for single in discography.get('singles', []):
try:
completion_data = _check_single_completion(db, single, artist_name, test_mode)
completion_data['type'] = 'single_completion'
completion_data['container_type'] = 'singles'
processed_count += 1
completion_data['progress'] = round((processed_count / total_items) * 100, 1)
yield f"data: {json.dumps(completion_data)}\n\n"
# Small delay to make the streaming effect visible
time.sleep(0.1) # 100ms delay between items
except Exception as e:
error_data = {
'type': 'error',
'container_type': 'singles',
'id': single.get('id', ''),
'name': single.get('name', 'Unknown'),
'error': str(e)
}
yield f"data: {json.dumps(error_data)}\n\n"
# Send completion signal
yield f"data: {json.dumps({'type': 'complete', 'processed_count': processed_count})}\n\n"
except Exception as e:
print(f"Error in streaming completion check: {e}")
import traceback
@ -12084,8 +11777,8 @@ def library_completion_stream():
def generate():
try:
from database.music_database import MusicDatabase
db = MusicDatabase()
from core.metadata_service import check_album_completion, check_single_completion
db = get_database()
categories = ['albums', 'eps', 'singles']
all_items = []
@ -12106,9 +11799,9 @@ def library_completion_stream():
}
if category == 'singles':
result = _check_single_completion(db, mapped, artist_name)
result = check_single_completion(db, mapped, artist_name)
else:
result = _check_album_completion(db, mapped, artist_name)
result = check_album_completion(db, mapped, artist_name)
result['spotify_id'] = item.get('spotify_id', '')
result['category'] = category

@ -35807,7 +35807,8 @@ async function loadArtistDiscography(artistId, artistName = null, sourceOverride
const discography = {
albums: data.albums || [],
singles: data.singles || []
singles: data.singles || [],
source: data.source || sourceOverride || null,
};
// Update selected artist with full details from backend (includes MusicBrainz ID)
@ -36303,7 +36304,7 @@ async function checkDiscographyCompletion(artistId, discography) {
body: JSON.stringify({
discography: discography,
artist_name: artistsPageState.selectedArtist?.name || 'Unknown Artist',
test_mode: window.location.search.includes('test=true')
source: discography?.source || artistsPageState.sourceOverride || null,
}),
signal: artistCompletionController.signal
});

Loading…
Cancel
Save