From 17865fe7121d39add38172f74dd34e7d4798dff7 Mon Sep 17 00:00:00 2001 From: Antti Kettunen Date: Sun, 19 Apr 2026 15:23:48 +0300 Subject: [PATCH] Refactor artist discography completion metadata flow Move completion checks into metadata_service and make them follow the configured metadata source priority. Drop the old test-mode path, remove the web_server wrapper indirection, and keep artist inference on explicit release metadata instead of guessing from a track search. Add coverage for the source-priority completion behavior and the safer artist-name handling. --- core/metadata_service.py | 410 +++++++++++++++++++++ tests/test_metadata_service_discography.py | 135 +++++++ web_server.py | 353 ++---------------- webui/static/script.js | 5 +- 4 files changed, 571 insertions(+), 332 deletions(-) diff --git a/core/metadata_service.py b/core/metadata_service.py index fcd516e1..6313f3ad 100644 --- a/core/metadata_service.py +++ b/core/metadata_service.py @@ -294,6 +294,7 @@ def _build_discography_release_dict(release: Any, artist_id: str) -> Optional[Di return { 'id': release_id, 'name': _extract_lookup_value(release, 'name', 'title', default=release_id), + 'artist_name': _extract_release_artist_name(release), 'release_date': release_date, 'album_type': album_type, 'image_url': _extract_lookup_value(release, 'image_url', 'thumb_url', 'cover_image'), @@ -302,6 +303,34 @@ def _build_discography_release_dict(release: Any, artist_id: str) -> Optional[Di } +def _extract_release_artist_name(release: Any) -> str: + artist_name = _extract_lookup_value(release, 'artist_name', 'artist', default='') or '' + artist_name = str(artist_name).strip() + if artist_name: + return artist_name + + artists = _extract_lookup_value(release, 'artists', default=[]) or [] + if isinstance(artists, (str, bytes)): + return str(artists).strip() + if isinstance(artists, dict): + return str(_extract_lookup_value(artists, 'name', 'artist_name', 'title', default='') or '').strip() + + try: + artists = list(artists) + except TypeError: + artists = [artists] + + if not artists: + return '' + + first_artist = artists[0] + inferred_name = _extract_lookup_value(first_artist, 'name', 'artist_name', 'title') + if not inferred_name and isinstance(first_artist, str): + inferred_name = first_artist + + return str(inferred_name).strip() if inferred_name else '' + + def _sort_discography_releases(releases: List[Dict[str, Any]]) -> List[Dict[str, Any]]: def get_release_year(item): if item.get('release_date'): @@ -394,6 +423,387 @@ def get_artist_discography( } +def _get_completion_source_chain(source_override: Optional[str] = None) -> List[str]: + primary_source = get_primary_source() + source_chain = list(get_source_priority(primary_source)) + + override = (source_override or '').strip().lower() + if override: + source_chain = [override] + [source for source in source_chain if source != override] + + return source_chain + + +def _extract_track_items(api_tracks: Any) -> List[Dict[str, Any]]: + if not api_tracks: + return [] + if isinstance(api_tracks, dict): + return api_tracks.get('items') or [] + if isinstance(api_tracks, list): + return api_tracks + return [] + + +def _resolve_completion_artist_name( + discography: Dict[str, Any], + artist_name: str, +) -> str: + resolved_name = (artist_name or '').strip() + if resolved_name and resolved_name.lower() != 'unknown artist': + return resolved_name + + release_items = list((discography or {}).get('albums', []) or []) + list((discography or {}).get('singles', []) or []) + if not release_items: + return resolved_name or 'Unknown Artist' + + release_artist_name = _extract_release_artist_name(release_items[0]) + if release_artist_name: + logger.debug("Using release artist metadata '%s' for completion", release_artist_name) + return release_artist_name + + return resolved_name or 'Unknown Artist' + + +def _resolve_completion_track_total(release: Dict[str, Any], source_chain: List[str]) -> int: + total_tracks = _extract_lookup_value(release, 'total_tracks', default=0) or 0 + if total_tracks: + return int(total_tracks) + + release_id = _extract_lookup_value(release, 'id', 'album_id', 'release_id') + if not release_id: + return 0 + + for source in source_chain: + try: + api_tracks = get_album_tracks_for_source(source, str(release_id)) + items = _extract_track_items(api_tracks) + if items: + logger.debug("Resolved track count for release %s from %s", release_id, source) + return len(items) + except Exception as exc: + logger.debug("Could not resolve track count for release %s from %s: %s", release_id, source, exc) + + return 0 + + +def check_album_completion( + db, + album_data: Dict[str, Any], + artist_name: str, + source_override: Optional[str] = None, + source_chain: Optional[List[str]] = None, +) -> Dict[str, Any]: + """Check completion status for a single album.""" + try: + source_chain = source_chain or _get_completion_source_chain(source_override) + album_name = album_data.get('name', '') + total_tracks = _resolve_completion_track_total(album_data, source_chain) + album_id = album_data.get('id', '') + + # If total_tracks is 0 (Discogs masters don't include track counts), + # try to fetch the real count from the prioritized metadata sources. + if total_tracks == 0 and album_id: + logger.debug("No track count found for '%s' (%s)", album_name, album_id) + + print(f"Checking album: '{album_name}' ({total_tracks} tracks)") + + formats = [] + # Check if album exists in database with completeness info + try: + from config.settings import config_manager + active_server = config_manager.get_active_media_server() + db_album, confidence, owned_tracks, expected_tracks, is_complete, formats = db.check_album_exists_with_completeness( + title=album_name, + artist=artist_name, + expected_track_count=total_tracks if total_tracks > 0 else None, + confidence_threshold=0.7, + server_source=active_server + ) + except Exception as db_error: + print(f"Database error for album '{album_name}': {db_error}") + return { + "id": album_id, + "name": album_name, + "status": "error", + "owned_tracks": 0, + "expected_tracks": total_tracks, + "completion_percentage": 0, + "confidence": 0.0, + "found_in_db": False, + "error_message": str(db_error), + "formats": [] + } + + if expected_tracks > 0: + completion_percentage = (owned_tracks / expected_tracks) * 100 + elif total_tracks > 0: + completion_percentage = (owned_tracks / total_tracks) * 100 + else: + completion_percentage = 100 if owned_tracks > 0 else 0 + + if owned_tracks > 0 and owned_tracks >= (expected_tracks or total_tracks): + status = "completed" + elif owned_tracks > 0: + status = "partial" + else: + status = "missing" + + print(f" Result: {owned_tracks}/{expected_tracks or total_tracks} tracks ({completion_percentage:.1f}%) - {status}") + + return { + "id": album_id, + "name": album_name, + "status": status, + "owned_tracks": owned_tracks, + "expected_tracks": expected_tracks or total_tracks, + "completion_percentage": round(completion_percentage, 1), + "confidence": round(confidence, 2) if confidence else 0.0, + "found_in_db": db_album is not None, + "formats": formats + } + + except Exception as e: + print(f"Error checking album completion for '{album_data.get('name', 'Unknown')}': {e}") + return { + "id": album_data.get('id', ''), + "name": album_data.get('name', 'Unknown'), + "status": "error", + "owned_tracks": 0, + "expected_tracks": album_data.get('total_tracks', 0), + "completion_percentage": 0, + "confidence": 0.0, + "found_in_db": False, + "formats": [] + } + + +def check_single_completion( + db, + single_data: Dict[str, Any], + artist_name: str, + source_override: Optional[str] = None, + source_chain: Optional[List[str]] = None, +) -> Dict[str, Any]: + """Check completion status for a single/EP.""" + try: + source_chain = source_chain or _get_completion_source_chain(source_override) + single_name = single_data.get('name', '') + raw_total_tracks = single_data.get('total_tracks', 1) + total_tracks = raw_total_tracks if raw_total_tracks is not None else 1 + single_id = single_data.get('id', '') + album_type = single_data.get('album_type', 'single') + formats = [] + + if total_tracks == 0: + total_tracks = _resolve_completion_track_total(single_data, source_chain) or 1 + + print(f"Checking {album_type}: '{single_name}' ({total_tracks} tracks)") + + if album_type == 'ep' or total_tracks > 1: + try: + from config.settings import config_manager + active_server = config_manager.get_active_media_server() + db_album, confidence, owned_tracks, expected_tracks, is_complete, formats = db.check_album_exists_with_completeness( + title=single_name, + artist=artist_name, + expected_track_count=total_tracks, + confidence_threshold=0.7, + server_source=active_server + ) + except Exception as db_error: + print(f"Database error for EP '{single_name}': {db_error}") + owned_tracks, expected_tracks, confidence = 0, total_tracks, 0.0 + db_album = None + + if expected_tracks > 0: + completion_percentage = (owned_tracks / expected_tracks) * 100 + else: + completion_percentage = (owned_tracks / total_tracks) * 100 + + if owned_tracks > 0 and owned_tracks >= (expected_tracks or total_tracks): + status = "completed" + elif owned_tracks > 0: + status = "partial" + else: + status = "missing" + + print(f" EP Result: {owned_tracks}/{expected_tracks or total_tracks} tracks ({completion_percentage:.1f}%) - {status}") + + return { + "id": single_id, + "name": single_name, + "status": status, + "owned_tracks": owned_tracks, + "expected_tracks": expected_tracks or total_tracks, + "completion_percentage": round(completion_percentage, 1), + "confidence": round(confidence, 2) if confidence else 0.0, + "found_in_db": db_album is not None, + "type": album_type, + "formats": formats + } + else: + try: + from config.settings import config_manager + active_server = config_manager.get_active_media_server() + db_track, confidence = db.check_track_exists( + title=single_name, + artist=artist_name, + confidence_threshold=0.7, + server_source=active_server + ) + except Exception as db_error: + print(f"Database error for single '{single_name}': {db_error}") + db_track, confidence = None, 0.0 + + owned_tracks = 1 if db_track else 0 + expected_tracks = 1 + completion_percentage = 100 if db_track else 0 + status = "completed" if db_track else "missing" + + if db_track and db_track.file_path: + import os + ext = os.path.splitext(db_track.file_path)[1].lstrip('.').upper() + if ext == 'MP3' and db_track.bitrate: + formats = [f"MP3-{db_track.bitrate}"] + elif ext: + formats = [ext] + + print(f" Single Result: {owned_tracks}/1 tracks ({completion_percentage:.1f}%) - {status}") + + return { + "id": single_id, + "name": single_name, + "status": status, + "owned_tracks": owned_tracks, + "expected_tracks": expected_tracks, + "completion_percentage": round(completion_percentage, 1), + "confidence": round(confidence, 2) if confidence else 0.0, + "found_in_db": db_track is not None, + "type": album_type, + "formats": formats + } + + except Exception as e: + print(f"Error checking single/EP completion for '{single_data.get('name', 'Unknown')}': {e}") + return { + "id": single_data.get('id', ''), + "name": single_data.get('name', 'Unknown'), + "status": "error", + "owned_tracks": 0, + "expected_tracks": single_data.get('total_tracks', 1), + "completion_percentage": 0, + "confidence": 0.0, + "found_in_db": False, + "type": single_data.get('album_type', 'single'), + "formats": [] + } + + +def iter_artist_discography_completion_events( + discography: Dict[str, Any], + artist_name: str = 'Unknown Artist', + source_override: Optional[str] = None, + db=None, +): + """Yield completion-stream events for artist discography ownership checks.""" + if db is None: + from database.music_database import get_database + + db = get_database() + source_chain = _get_completion_source_chain(source_override) + resolved_artist_name = _resolve_completion_artist_name(discography or {}, artist_name) + + albums = list((discography or {}).get('albums', []) or []) + singles = list((discography or {}).get('singles', []) or []) + total_items = len(albums) + len(singles) + processed_count = 0 + + yield { + 'type': 'start', + 'total_items': total_items, + 'artist_name': resolved_artist_name, + } + + for album in albums: + try: + completion_data = check_album_completion( + db, + album, + resolved_artist_name, + source_override=source_override, + source_chain=source_chain, + ) + completion_data['type'] = 'album_completion' + completion_data['container_type'] = 'albums' + processed_count += 1 + completion_data['progress'] = round((processed_count / total_items) * 100, 1) if total_items else 100 + yield completion_data + except Exception as e: + yield { + 'type': 'error', + 'container_type': 'albums', + 'id': album.get('id', ''), + 'name': album.get('name', 'Unknown'), + 'error': str(e), + } + + for single in singles: + try: + completion_data = check_single_completion( + db, + single, + resolved_artist_name, + source_override=source_override, + source_chain=source_chain, + ) + completion_data['type'] = 'single_completion' + completion_data['container_type'] = 'singles' + processed_count += 1 + completion_data['progress'] = round((processed_count / total_items) * 100, 1) if total_items else 100 + yield completion_data + except Exception as e: + yield { + 'type': 'error', + 'container_type': 'singles', + 'id': single.get('id', ''), + 'name': single.get('name', 'Unknown'), + 'error': str(e), + } + + yield { + 'type': 'complete', + 'processed_count': processed_count, + 'artist_name': resolved_artist_name, + } + + +def check_artist_discography_completion( + discography: Dict[str, Any], + artist_name: str = 'Unknown Artist', + source_override: Optional[str] = None, + db=None, +) -> Dict[str, Any]: + """Return completion results for an artist discography without streaming.""" + albums_completion = [] + singles_completion = [] + + for event in iter_artist_discography_completion_events( + discography, + artist_name=artist_name, + source_override=source_override, + db=db, + ): + if event.get('type') == 'album_completion': + albums_completion.append(event) + elif event.get('type') == 'single_completion': + singles_completion.append(event) + + return { + 'albums': albums_completion, + 'singles': singles_completion, + } + + def get_deezer_client(): """Get cached Deezer client. diff --git a/tests/test_metadata_service_discography.py b/tests/test_metadata_service_discography.py index 7267b5bd..81d2a8c8 100644 --- a/tests/test_metadata_service_discography.py +++ b/tests/test_metadata_service_discography.py @@ -32,6 +32,9 @@ if "config.settings" not in sys.modules: def get(self, key, default=None): return default + def get_active_media_server(self): + return "primary" + settings_mod.config_manager = _DummyConfigManager() config_pkg.settings = settings_mod sys.modules["config"] = config_pkg @@ -56,6 +59,7 @@ class _FakeSourceClient: self.album_calls = [] self.artist_search_calls = [] self.discography_calls = [] + self.track_search_calls = [] def get_artist_albums(self, artist_id, **kwargs): self.album_calls.append((artist_id, dict(kwargs))) @@ -69,6 +73,14 @@ class _FakeSourceClient: self.discography_calls.append((query, dict(kwargs))) return list(self.discography_results) + def search_tracks(self, query, **kwargs): + self.track_search_calls.append((query, dict(kwargs))) + return [] + + def get_album_tracks(self, album_id, **kwargs): + self.album_calls.append((album_id, dict(kwargs))) + return {"items": list(self.album_results)} + def _album(album_id, name, release_date, album_type="album"): return types.SimpleNamespace( @@ -248,3 +260,126 @@ def test_get_artist_discography_uses_hydrabase_fast_path_when_active(monkeypatch ) ] assert hydrabase.artist_search_calls == [("Artist One", {"limit": 5})] + + +class _CompletionFakeDB: + def __init__(self, owned_tracks=1, expected_tracks=3, is_track=False): + self.owned_tracks = owned_tracks + self.expected_tracks = expected_tracks + self.is_track = is_track + self.album_calls = [] + self.track_calls = [] + + def check_album_exists_with_completeness(self, **kwargs): + self.album_calls.append(dict(kwargs)) + return (True, 0.9, self.owned_tracks, self.expected_tracks, self.owned_tracks >= self.expected_tracks, []) + + def check_track_exists(self, **kwargs): + self.track_calls.append(dict(kwargs)) + if self.is_track: + return (object(), 0.9) + return (None, 0.0) + + +def test_iter_artist_discography_completion_uses_primary_source_first(monkeypatch): + deezer = _FakeSourceClient() + spotify = _FakeSourceClient() + itunes = _FakeSourceClient() + + deezer.album_results = [{"id": "release-1-track-1"}, {"id": "release-1-track-2"}] + spotify.album_results = [{"id": "release-1-track-1"}, {"id": "release-1-track-2"}, {"id": "release-1-track-3"}] + itunes.album_results = [{"id": "release-1-track-1"}] + + clients = { + "deezer": deezer, + "spotify": spotify, + "itunes": itunes, + } + + monkeypatch.setattr(metadata_service, "get_primary_source", lambda: "deezer") + monkeypatch.setattr(metadata_service, "get_source_priority", lambda primary: [primary, "spotify", "itunes"]) + monkeypatch.setattr(metadata_service, "get_client_for_source", lambda source: clients.get(source)) + + db = _CompletionFakeDB(owned_tracks=1, expected_tracks=2) + events = list(metadata_service.iter_artist_discography_completion_events( + { + "albums": [{"id": "release-1", "name": "Album One", "total_tracks": 0}], + "singles": [], + }, + artist_name="Artist One", + db=db, + )) + + assert events[0]["type"] == "start" + assert events[-1]["type"] == "complete" + assert events[1]["expected_tracks"] == 2 + assert events[1]["status"] == "partial" + assert deezer.album_calls == [("release-1", {})] + assert spotify.album_calls == [] + assert itunes.album_calls == [] + assert db.album_calls and db.album_calls[0]["expected_track_count"] == 2 + + +def test_iter_artist_discography_completion_respects_source_override(monkeypatch): + deezer = _FakeSourceClient() + spotify = _FakeSourceClient() + itunes = _FakeSourceClient() + + deezer.album_results = [{"id": "release-2-track-1"}] + spotify.album_results = [{"id": "release-2-track-1"}, {"id": "release-2-track-2"}] + itunes.album_results = [{"id": "release-2-track-1"}, {"id": "release-2-track-2"}, {"id": "release-2-track-3"}] + + clients = { + "deezer": deezer, + "spotify": spotify, + "itunes": itunes, + } + + monkeypatch.setattr(metadata_service, "get_primary_source", lambda: "deezer") + monkeypatch.setattr(metadata_service, "get_source_priority", lambda primary: [primary, "spotify", "itunes"]) + monkeypatch.setattr(metadata_service, "get_client_for_source", lambda source: clients.get(source)) + + db = _CompletionFakeDB(owned_tracks=1, expected_tracks=3) + events = list(metadata_service.iter_artist_discography_completion_events( + { + "albums": [{"id": "release-2", "name": "Album Two", "total_tracks": 0}], + "singles": [], + }, + artist_name="Artist Two", + source_override="itunes", + db=db, + )) + + assert events[1]["expected_tracks"] == 3 + assert itunes.album_calls == [("release-2", {})] + assert deezer.album_calls == [] + assert spotify.album_calls == [] + + +def test_iter_artist_discography_completion_uses_release_artist_metadata(monkeypatch): + source = _FakeSourceClient() + clients = {"deezer": source} + + monkeypatch.setattr(metadata_service, "get_primary_source", lambda: "deezer") + monkeypatch.setattr(metadata_service, "get_source_priority", lambda primary: [primary]) + monkeypatch.setattr(metadata_service, "get_client_for_source", lambda source_name: clients.get(source_name)) + + db = _CompletionFakeDB(owned_tracks=1, expected_tracks=2) + events = list(metadata_service.iter_artist_discography_completion_events( + { + "albums": [{ + "id": "release-3", + "name": "Album Three", + "artist_name": "Explicit Artist", + "total_tracks": 2, + }], + "singles": [], + }, + artist_name="Unknown Artist", + db=db, + )) + + assert events[0]["artist_name"] == "Explicit Artist" + assert events[1]["name"] == "Album Three" + assert db.album_calls[0]["artist"] == "Explicit Artist" + assert source.track_search_calls == [] diff --git a/web_server.py b/web_server.py index e807e1c5..7eed2bc6 100644 --- a/web_server.py +++ b/web_server.py @@ -11701,276 +11701,22 @@ def check_artist_discography_completion(artist_id): data = request.get_json() if not data or 'discography' not in data: return jsonify({"error": "Missing discography data"}), 400 - + from core.metadata_service import check_artist_discography_completion as _check_artist_discography_completion + discography = data['discography'] - test_mode = data.get('test_mode', False) # Add test mode for demonstration - albums_completion = [] - singles_completion = [] - - # Get database instance - from database.music_database import MusicDatabase - db = MusicDatabase() - - # Get artist name - should be provided by the frontend - artist_name = data.get('artist_name', 'Unknown Artist') - - # If no artist name provided, try to infer it from the request - if artist_name == 'Unknown Artist': - print(f"No artist name provided in request, attempting to infer from discography data") - # Try to extract from first album's title by using a simple search - all_items = discography.get('albums', []) + discography.get('singles', []) - if all_items and spotify_client and spotify_client.is_authenticated(): - try: - first_item = all_items[0] - # Search for the first track to get artist name - search_results = spotify_client.search_tracks(first_item.get('name', ''), limit=1) - if search_results and len(search_results) > 0: - artist_name = search_results[0].artists[0] if search_results[0].artists else "Unknown Artist" - print(f"Inferred artist name from search: {artist_name}") - except Exception as e: - print(f"Could not infer artist name: {e}") - artist_name = "Unknown Artist" - - print(f"Checking completion for artist: {artist_name}") - - # Process albums - for album in discography.get('albums', []): - completion_data = _check_album_completion(db, album, artist_name, test_mode) - albums_completion.append(completion_data) - - # Process singles/EPs - for single in discography.get('singles', []): - completion_data = _check_single_completion(db, single, artist_name, test_mode) - singles_completion.append(completion_data) - - return jsonify({ - "albums": albums_completion, - "singles": singles_completion - }) - + source_override = (data.get('source') or '').strip().lower() or None + result = _check_artist_discography_completion( + discography, + artist_name=data.get('artist_name', 'Unknown Artist'), + source_override=source_override, + ) + return jsonify(result) except Exception as e: print(f"Error checking discography completion: {e}") import traceback traceback.print_exc() return jsonify({"error": str(e)}), 500 -def _check_album_completion(db, album_data: dict, artist_name: str, test_mode: bool = False) -> dict: - """Check completion status for a single album""" - try: - album_name = album_data.get('name', '') - total_tracks = album_data.get('total_tracks', 0) - album_id = album_data.get('id', '') - - # If total_tracks is 0 (Discogs masters don't include track counts), - # try to fetch the real count from the source - if total_tracks == 0 and album_id: - try: - fallback = _get_metadata_fallback_client() - album_detail = fallback.get_album_tracks(str(album_id)) - if album_detail and album_detail.get('items'): - total_tracks = len(album_detail['items']) - logger.debug(f"Fetched track count for '{album_name}': {total_tracks}") - except Exception: - pass - - print(f"Checking album: '{album_name}' ({total_tracks} tracks)") - - formats = [] - if test_mode: - # Generate test data to demonstrate the feature - import random - owned_tracks = random.randint(0, max(1, total_tracks)) - expected_tracks = total_tracks - confidence = random.uniform(0.7, 1.0) - db_album = True # Simulate found album - print(f"TEST MODE: Simulating {owned_tracks}/{expected_tracks} tracks for '{album_name}'") - else: - # Check if album exists in database with completeness info - try: - # Get active server for database checking - active_server = config_manager.get_active_media_server() - db_album, confidence, owned_tracks, expected_tracks, is_complete, formats = db.check_album_exists_with_completeness( - title=album_name, - artist=artist_name, - expected_track_count=total_tracks if total_tracks > 0 else None, - confidence_threshold=0.7, # Slightly lower threshold for better matching - server_source=active_server # Check only the active server - ) - except Exception as db_error: - print(f"Database error for album '{album_name}': {db_error}") - # Return error state for this album - return { - "id": album_id, - "name": album_name, - "status": "error", - "owned_tracks": 0, - "expected_tracks": total_tracks, - "completion_percentage": 0, - "confidence": 0.0, - "found_in_db": False, - "error_message": str(db_error), - "formats": [] - } - - # Calculate completion percentage - if expected_tracks > 0: - completion_percentage = (owned_tracks / expected_tracks) * 100 - elif total_tracks > 0: - completion_percentage = (owned_tracks / total_tracks) * 100 - else: - completion_percentage = 100 if owned_tracks > 0 else 0 - - # Determine completion status — exact match, no percentage rounding - if owned_tracks > 0 and owned_tracks >= (expected_tracks or total_tracks): - status = "completed" - elif owned_tracks > 0: - status = "partial" - else: - status = "missing" - - print(f" Result: {owned_tracks}/{expected_tracks or total_tracks} tracks ({completion_percentage:.1f}%) - {status}") - - return { - "id": album_id, - "name": album_name, - "status": status, - "owned_tracks": owned_tracks, - "expected_tracks": expected_tracks or total_tracks, - "completion_percentage": round(completion_percentage, 1), - "confidence": round(confidence, 2) if confidence else 0.0, - "found_in_db": db_album is not None, - "formats": formats - } - - except Exception as e: - print(f"Error checking album completion for '{album_data.get('name', 'Unknown')}': {e}") - return { - "id": album_data.get('id', ''), - "name": album_data.get('name', 'Unknown'), - "status": "error", - "owned_tracks": 0, - "expected_tracks": album_data.get('total_tracks', 0), - "completion_percentage": 0, - "confidence": 0.0, - "found_in_db": False, - "formats": [] - } - -def _check_single_completion(db, single_data: dict, artist_name: str, test_mode: bool = False) -> dict: - """Check completion status for a single/EP (treat EPs like albums, singles as single tracks)""" - try: - single_name = single_data.get('name', '') - total_tracks = single_data.get('total_tracks', 1) - single_id = single_data.get('id', '') - album_type = single_data.get('album_type', 'single') - formats = [] - - print(f"Checking {album_type}: '{single_name}' ({total_tracks} tracks)") - - if test_mode: - # Generate test data for singles/EPs - import random - if album_type == 'ep' or total_tracks > 1: - owned_tracks = random.randint(0, total_tracks) - expected_tracks = total_tracks - confidence = random.uniform(0.7, 1.0) - print(f"TEST MODE: EP with {owned_tracks}/{expected_tracks} tracks") - else: - owned_tracks = random.choice([0, 1]) # 50/50 chance - expected_tracks = 1 - confidence = random.uniform(0.7, 1.0) if owned_tracks else 0.0 - print(f"TEST MODE: Single with {owned_tracks}/{expected_tracks} tracks") - elif album_type == 'ep' or total_tracks > 1: - # Treat EPs like albums - try: - # Get active server for database checking - active_server = config_manager.get_active_media_server() - db_album, confidence, owned_tracks, expected_tracks, is_complete, formats = db.check_album_exists_with_completeness( - title=single_name, - artist=artist_name, - expected_track_count=total_tracks, - confidence_threshold=0.7, - server_source=active_server # Check only the active server - ) - except Exception as db_error: - print(f"Database error for EP '{single_name}': {db_error}") - owned_tracks, expected_tracks, confidence = 0, total_tracks, 0.0 - - # Calculate completion percentage - if expected_tracks > 0: - completion_percentage = (owned_tracks / expected_tracks) * 100 - else: - completion_percentage = (owned_tracks / total_tracks) * 100 - - # Determine status — exact match, no percentage rounding - if owned_tracks > 0 and owned_tracks >= (expected_tracks or total_tracks): - status = "completed" - elif owned_tracks > 0: - status = "partial" - else: - status = "missing" - - print(f" EP Result: {owned_tracks}/{expected_tracks or total_tracks} tracks ({completion_percentage:.1f}%) - {status}") - - else: - # Single track - just check if the track exists - try: - active_server = config_manager.get_active_media_server() - db_track, confidence = db.check_track_exists( - title=single_name, - artist=artist_name, - confidence_threshold=0.7, - server_source=active_server - ) - except Exception as db_error: - print(f"Database error for single '{single_name}': {db_error}") - db_track, confidence = None, 0.0 - - owned_tracks = 1 if db_track else 0 - expected_tracks = 1 - completion_percentage = 100 if db_track else 0 - - status = "completed" if db_track else "missing" - - # Extract format from single track - if db_track and db_track.file_path: - import os - ext = os.path.splitext(db_track.file_path)[1].lstrip('.').upper() - if ext == 'MP3' and db_track.bitrate: - formats = [f"MP3-{db_track.bitrate}"] - elif ext: - formats = [ext] - - print(f" Single Result: {owned_tracks}/1 tracks ({completion_percentage:.1f}%) - {status}") - - return { - "id": single_id, - "name": single_name, - "status": status, - "owned_tracks": owned_tracks, - "expected_tracks": expected_tracks or total_tracks, - "completion_percentage": round(completion_percentage, 1), - "confidence": round(confidence, 2) if confidence else 0.0, - "found_in_db": (db_album if album_type == 'ep' or total_tracks > 1 else db_track) is not None, - "type": album_type, - "formats": formats - } - - except Exception as e: - print(f"Error checking single/EP completion for '{single_data.get('name', 'Unknown')}': {e}") - return { - "id": single_data.get('id', ''), - "name": single_data.get('name', 'Unknown'), - "status": "error", - "owned_tracks": 0, - "expected_tracks": single_data.get('total_tracks', 1), - "completion_percentage": 0, - "confidence": 0.0, - "found_in_db": False, - "type": single_data.get('album_type', 'single'), - "formats": [] - } - @app.route('/api/artist//completion-stream', methods=['POST']) def check_artist_discography_completion_stream(artist_id): """Stream completion status for artist's albums and singles one by one""" @@ -11984,75 +11730,22 @@ def check_artist_discography_completion_stream(artist_id): # Extract data for the generator discography = data['discography'] - test_mode = data.get('test_mode', False) artist_name = data.get('artist_name', 'Unknown Artist') - + source_override = (data.get('source') or '').strip().lower() or None + from core.metadata_service import iter_artist_discography_completion_events + def generate_completion_stream(): try: print(f"Starting streaming completion check for artist: {artist_name}") - - # Get database instance - from database.music_database import MusicDatabase - db = MusicDatabase() - - # Process albums one by one - total_items = len(discography.get('albums', [])) + len(discography.get('singles', [])) - processed_count = 0 - - # Send initial status - yield f"data: {json.dumps({'type': 'start', 'total_items': total_items, 'artist_name': artist_name})}\n\n" - - # Process albums - for album in discography.get('albums', []): - try: - completion_data = _check_album_completion(db, album, artist_name, test_mode) - completion_data['type'] = 'album_completion' - completion_data['container_type'] = 'albums' - processed_count += 1 - completion_data['progress'] = round((processed_count / total_items) * 100, 1) - - yield f"data: {json.dumps(completion_data)}\n\n" - + for event in iter_artist_discography_completion_events( + discography, + artist_name=artist_name, + source_override=source_override, + ): + yield f"data: {json.dumps(event)}\n\n" + if event.get('type') in ('album_completion', 'single_completion'): # Small delay to make the streaming effect visible time.sleep(0.1) # 100ms delay between items - - except Exception as e: - error_data = { - 'type': 'error', - 'container_type': 'albums', - 'id': album.get('id', ''), - 'name': album.get('name', 'Unknown'), - 'error': str(e) - } - yield f"data: {json.dumps(error_data)}\n\n" - - # Process singles/EPs - for single in discography.get('singles', []): - try: - completion_data = _check_single_completion(db, single, artist_name, test_mode) - completion_data['type'] = 'single_completion' - completion_data['container_type'] = 'singles' - processed_count += 1 - completion_data['progress'] = round((processed_count / total_items) * 100, 1) - - yield f"data: {json.dumps(completion_data)}\n\n" - - # Small delay to make the streaming effect visible - time.sleep(0.1) # 100ms delay between items - - except Exception as e: - error_data = { - 'type': 'error', - 'container_type': 'singles', - 'id': single.get('id', ''), - 'name': single.get('name', 'Unknown'), - 'error': str(e) - } - yield f"data: {json.dumps(error_data)}\n\n" - - # Send completion signal - yield f"data: {json.dumps({'type': 'complete', 'processed_count': processed_count})}\n\n" - except Exception as e: print(f"Error in streaming completion check: {e}") import traceback @@ -12084,8 +11777,8 @@ def library_completion_stream(): def generate(): try: - from database.music_database import MusicDatabase - db = MusicDatabase() + from core.metadata_service import check_album_completion, check_single_completion + db = get_database() categories = ['albums', 'eps', 'singles'] all_items = [] @@ -12106,9 +11799,9 @@ def library_completion_stream(): } if category == 'singles': - result = _check_single_completion(db, mapped, artist_name) + result = check_single_completion(db, mapped, artist_name) else: - result = _check_album_completion(db, mapped, artist_name) + result = check_album_completion(db, mapped, artist_name) result['spotify_id'] = item.get('spotify_id', '') result['category'] = category diff --git a/webui/static/script.js b/webui/static/script.js index 524fbe69..81c84b60 100644 --- a/webui/static/script.js +++ b/webui/static/script.js @@ -35807,7 +35807,8 @@ async function loadArtistDiscography(artistId, artistName = null, sourceOverride const discography = { albums: data.albums || [], - singles: data.singles || [] + singles: data.singles || [], + source: data.source || sourceOverride || null, }; // Update selected artist with full details from backend (includes MusicBrainz ID) @@ -36303,7 +36304,7 @@ async function checkDiscographyCompletion(artistId, discography) { body: JSON.stringify({ discography: discography, artist_name: artistsPageState.selectedArtist?.name || 'Unknown Artist', - test_mode: window.location.search.includes('test=true') + source: discography?.source || artistsPageState.sourceOverride || null, }), signal: artistCompletionController.signal });