diff --git a/core/playlists/__init__.py b/core/playlists/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/core/playlists/explorer.py b/core/playlists/explorer.py new file mode 100644 index 00000000..991f6915 --- /dev/null +++ b/core/playlists/explorer.py @@ -0,0 +1,363 @@ +"""Playlist explorer build-tree route. + +`playlist_explorer_build_tree(deps)` is the body of the +`POST /api/playlist-explorer/build-tree` route. It builds a discovery +tree from a mirrored playlist and streams the result as NDJSON +(one JSON object per artist line + a final 'complete' line). + +Works with Spotify (preferred), iTunes, or Deezer as the metadata +source. Uses and populates the metadata cache to avoid redundant API +calls per discography fetch. + +Two operating modes: +- `albums`: only show releases that overlap with the playlist's tracks. +- `discographies`: show the full discography of every artist in the + playlist, with `in_playlist` flag on the matching releases. + +Per-artist flow inside the streaming generator: +1. Resolve discography via `_fetch_artist_discography` (cache → fall + through to live API search). +2. Tag each release with `in_playlist` based on title-similarity match + against the playlist's track/album names. +3. Apply mode filter, sort by in-playlist-first then year DESC. +4. Yield one JSON line per artist. + +The route returns Flask's streaming `Response` wrapper around the NDJSON +generator. Early-exit cases (bad request, playlist not found, top-level +exception) yield via Flask's standard `jsonify(...), status` shape. + +Lifted verbatim from web_server.py. Wide dependency surface (Flask +`request` + `Response`, Spotify client, multiple metadata helpers, +DB access, metadata cache) all injected via `PlaylistExplorerDeps`. +""" + +from __future__ import annotations + +import json +import logging +import time +from dataclasses import dataclass +from typing import Any, Callable + +logger = logging.getLogger(__name__) + + +@dataclass +class PlaylistExplorerDeps: + """Bundle of cross-cutting deps the playlist explorer needs.""" + request: Any # flask.request proxy + flask_response: Any # flask.Response constructor + flask_jsonify: Any # flask.jsonify + spotify_client: Any + get_database: Callable[[], Any] + get_active_discovery_source: Callable[[], str] + get_metadata_fallback_client: Callable[[], Any] + get_metadata_fallback_source: Callable[[], str] + get_metadata_cache: Callable[[], Any] + + +def playlist_explorer_build_tree(deps: PlaylistExplorerDeps): + """Build a discovery tree from a mirrored playlist. + Streams NDJSON: one line per artist with their albums. + Works with Spotify, iTunes, or Deezer as the metadata source. + Uses and populates the metadata cache to avoid redundant API calls.""" + try: + data = deps.request.get_json() + if not data: + return deps.flask_jsonify({"success": False, "error": "No data provided"}), 400 + + playlist_id = data.get('playlist_id') + mode = data.get('mode', 'albums') # 'albums' or 'discographies' + + if not playlist_id: + return deps.flask_jsonify({"success": False, "error": "playlist_id is required"}), 400 + if mode not in ('albums', 'discographies'): + return deps.flask_jsonify({"success": False, "error": "mode must be 'albums' or 'discographies'"}), 400 + + database = deps.get_database() + playlist = database.get_mirrored_playlist(playlist_id) + if not playlist: + return deps.flask_jsonify({"success": False, "error": "Playlist not found"}), 404 + + tracks = database.get_mirrored_playlist_tracks(playlist_id) + if not tracks: + return deps.flask_jsonify({"success": False, "error": "Playlist has no tracks"}), 400 + + # Determine active metadata source — respect user's configured primary + source_name = deps.get_active_discovery_source() + if source_name == 'spotify' and deps.spotify_client and deps.spotify_client.is_spotify_authenticated(): + active_client = deps.spotify_client + else: + active_client = deps.get_metadata_fallback_client() + source_name = deps.get_metadata_fallback_source() + + cache = deps.get_metadata_cache() + + # Parse extra_data and group tracks by artist using discovered data + artist_groups = {} + for t in tracks: + extra = {} + if t.get('extra_data'): + try: + extra = json.loads(t['extra_data']) if isinstance(t['extra_data'], str) else t['extra_data'] + except (json.JSONDecodeError, TypeError): + pass + + # Only use discovery data if it matches the active metadata source + is_discovered = extra.get('discovered', False) + provider = (extra.get('provider') or '').lower() + source_matches = provider == source_name or (provider in ('itunes', 'apple') and source_name == 'itunes') + + matched = extra.get('matched_data', {}) if (is_discovered and source_matches) else {} + artists_list = matched.get('artists', []) + primary_artist = artists_list[0] if artists_list else None + # Artists can be dicts {"name": "X", "id": "Y"} or plain strings "X" + if isinstance(primary_artist, dict): + artist_name = primary_artist.get('name') or (t.get('artist_name') or '').strip() + artist_id = primary_artist.get('id') or None + elif isinstance(primary_artist, str): + artist_name = primary_artist or (t.get('artist_name') or '').strip() + artist_id = None + else: + artist_name = (t.get('artist_name') or '').strip() + artist_id = None + + if not artist_name: + continue + + key = artist_name.lower() + if key not in artist_groups: + artist_groups[key] = { + 'name': artist_name, + 'artist_id': artist_id, # Pre-resolved from discovery + 'tracks': [], + 'album_names': set(), + 'discovered': extra.get('discovered', False), + } + # If we get an artist_id from a later track but didn't have one before, fill it in + if artist_id and not artist_groups[key].get('artist_id'): + artist_groups[key]['artist_id'] = artist_id + + artist_groups[key]['tracks'].append(t.get('track_name', '')) + # Get album name from discovered data or playlist field + album_name = '' + album_data = matched.get('album') + if isinstance(album_data, dict) and album_data.get('name'): + album_name = album_data['name'] + elif (t.get('album_name') or '').strip(): + album_name = t['album_name'].strip() + if album_name: + artist_groups[key]['album_names'].add(album_name) + + def _normalize_for_match(title): + import re + return re.sub(r'\s*[\(\[][^)\]]*[\)\]]', '', title).strip().lower() + + def _fetch_artist_discography(artist_name, known_artist_id=None): + """Fetch discography using the active client. Checks cache first, stores results after. + If known_artist_id is provided (from discovery cache), skips the name search.""" + # Check cache for this artist's discography + cache_key = f"explorer_disco_{artist_name.lower().strip()}" + cached = cache.get_entity(source_name, 'artist_discography', cache_key) if cache else None + if cached and isinstance(cached, dict) and cached.get('albums'): + logger.debug(f"Explorer: cache hit for '{artist_name}' discography") + return cached + + artist_id = known_artist_id + artist_image = None + + if artist_id: + # Already have the ID from discovery — just fetch the artist image + try: + artist_info = active_client.get_artist(artist_id) + if artist_info: + if isinstance(artist_info, dict): + images = artist_info.get('images') or [] + artist_image = images[0].get('url') if images else None + elif hasattr(artist_info, 'image_url'): + artist_image = artist_info.image_url + except Exception: + pass + else: + # No pre-resolved ID — search by name + try: + search_results = active_client.search_artists(artist_name, limit=5) + except Exception as e: + return {'success': False, 'error': f'Search failed: {e}'} + + if not search_results: + return {'success': False, 'error': f'"{artist_name}" not found'} + + # Find best match (exact first, then fuzzy) + best = None + for a in search_results: + if a.name.lower().strip() == artist_name.lower().strip(): + best = a + break + if not best: + best = search_results[0] + + artist_id = best.id + artist_image = best.image_url if hasattr(best, 'image_url') else None + + # Fetch albums + try: + # skip_cache only supported by spotify_client — other clients don't cache this call + _skip = {'skip_cache': True} if hasattr(active_client, 'sp') else {} + all_albums = active_client.get_artist_albums(artist_id, album_type='album,single', **_skip) + except Exception as e: + return {'success': False, 'error': f'Album fetch failed: {e}'} + + if not all_albums: + return {'success': False, 'error': 'No albums found'} + + # Check which albums the user already owns + owned_titles = set() + try: + db = deps.get_database() + with db._get_connection() as conn: + cursor = conn.cursor() + # Find all artists in DB matching this name + cursor.execute("SELECT id FROM artists WHERE LOWER(name) = LOWER(?)", (artist_name,)) + artist_rows = cursor.fetchall() + for ar in artist_rows: + cursor.execute("SELECT title FROM albums WHERE artist_id = ?", (ar['id'],)) + for alb_row in cursor.fetchall(): + owned_titles.add((alb_row['title'] or '').strip().lower()) + except Exception: + pass # Non-critical — owned badges just won't show + + # Build release list + releases = [] + for album in all_albums: + # Skip albums where this artist isn't primary + if hasattr(album, 'artist_ids') and album.artist_ids and album.artist_ids[0] != artist_id: + continue + releases.append({ + 'title': album.name, + 'year': album.release_date[:4] if album.release_date else None, + 'image_url': album.image_url, + 'spotify_id': album.id, + 'track_count': album.total_tracks, + 'album_type': (album.album_type or 'album').lower(), + 'owned': (album.name or '').strip().lower() in owned_titles, + }) + + result = { + 'success': True, + 'name': artist_name, # Required for metadata cache validation + 'albums': releases, + 'artist_image': artist_image, + 'artist_id': artist_id, + 'artist_name': artist_name, + } + + # Store in cache + if cache and releases: + try: + cache.store_entity(source_name, 'artist_discography', cache_key, result) + except Exception: + pass + + return result + + def generate(): + yield json.dumps({ + "type": "meta", + "playlist_name": playlist.get('name', 'Unknown Playlist'), + "playlist_image": playlist.get('image_url', ''), + "total_artists": len(artist_groups), + "total_tracks": len(tracks), + "source": source_name, + }) + '\n' + + total_albums = 0 + + for idx, (_key, group) in enumerate(artist_groups.items()): + artist_name = group['name'] + playlist_track_names = group['tracks'] + playlist_album_names = group['album_names'] + + try: + disco = _fetch_artist_discography(artist_name, group.get('artist_id')) + + if not disco.get('success'): + yield json.dumps({ + "type": "artist", + "name": artist_name, + "artist_id": None, + "image_url": None, + "playlist_tracks": playlist_track_names, + "albums": [], + "error": disco.get('error', 'Not found'), + }) + '\n' + time.sleep(0.1) + continue + + # Tag each release with in_playlist flag + # If no album names available, fall back to matching track names against single titles + match_names = playlist_album_names + if not match_names: + match_names = set(playlist_track_names) + + all_releases = [] + for release in disco.get('albums', []): + r = dict(release) + norm_title = _normalize_for_match(r['title']) + r['in_playlist'] = any( + _normalize_for_match(a) == norm_title or + norm_title in _normalize_for_match(a) or + _normalize_for_match(a) in norm_title + for a in match_names + ) + all_releases.append(r) + + # Filter based on mode + if mode == 'albums': + filtered = [r for r in all_releases if r['in_playlist']] + else: + filtered = all_releases + + filtered.sort(key=lambda r: (not r.get('in_playlist', False), -(int(r.get('year') or 0)))) + total_albums += len(filtered) + + yield json.dumps({ + "type": "artist", + "name": disco.get('artist_name', artist_name), + "artist_id": disco.get('artist_id'), + "image_url": disco.get('artist_image'), + "playlist_tracks": playlist_track_names, + "albums": filtered, + }) + '\n' + + except Exception as e: + logger.error(f"Explorer: error processing artist '{artist_name}': {e}") + yield json.dumps({ + "type": "artist", + "name": artist_name, + "artist_id": None, + "image_url": None, + "playlist_tracks": playlist_track_names, + "albums": [], + "error": str(e), + }) + '\n' + + # Rate limit protection between artists + if idx < len(artist_groups) - 1: + time.sleep(0.2) + + deps.get_database().mark_mirrored_playlist_explored(playlist_id) + yield json.dumps({"type": "complete", "total_artists": len(artist_groups), "total_albums": total_albums}) + '\n' + + return deps.flask_response(generate(), mimetype='application/x-ndjson', headers={ + 'Cache-Control': 'no-cache', + 'X-Accel-Buffering': 'no', + }) + + except Exception as e: + logger.error(f"Playlist Explorer build-tree error: {e}") + import traceback + traceback.print_exc() + return deps.flask_jsonify({"success": False, "error": str(e)}), 500 + + diff --git a/core/streaming/__init__.py b/core/streaming/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/core/streaming/prepare.py b/core/streaming/prepare.py new file mode 100644 index 00000000..44977c66 --- /dev/null +++ b/core/streaming/prepare.py @@ -0,0 +1,326 @@ +"""Streaming preparation worker. + +`prepare_stream_task(track_data, deps)` is the function the stream +executor submits to fetch a track from Soulseek/YouTube/etc and stage +it in the local Stream/ folder for the browser audio player. + +1. Reset stream state to 'loading' with the new track info. +2. Clear any prior file from the Stream/ folder (only one stream lives + there at a time). +3. Spin up a fresh asyncio event loop and `soulseek_client.download()` + the track. +4. Poll `soulseek_client.get_all_downloads()` every 1.5 s to track + progress, with separate handling for queued vs actively downloading + states. Queue timeout = 15 s; overall timeout = 60 s. +5. On completion (state ~ 'succeeded' or progress >= 100% AND bytes + transferred match expected size), find the downloaded file with retry + logic, move it into Stream/, signal completion to the slskd API, and + mark stream_state as 'ready' with the file path. +6. On any error/timeout/cancel: stream_state goes to 'error' or + 'stopped' with an explanatory message. +7. Finally: tear down the event loop cleanly. + +The original mutated `stream_state` as a module global. Here it's +exposed through the `PrepareStreamDeps` proxy as a Python property so +the lifted body keeps the same `name[key] = value` syntax. The setter +fires only if the function reassigns (currently it only mutates in +place via .update() and key assignment). +""" + +from __future__ import annotations + +import asyncio +import glob +import logging +import os +import shutil +import time +from dataclasses import dataclass +from typing import Any, Callable + +logger = logging.getLogger(__name__) + + +@dataclass +class PrepareStreamDeps: + """Bundle of cross-cutting deps the stream-prep worker needs.""" + config_manager: Any + soulseek_client: Any + stream_lock: Any # threading.Lock + project_root: str # absolute path to web_server.py's directory + docker_resolve_path: Callable[[str], str] + find_streaming_download_in_all_downloads: Callable + find_downloaded_file: Callable + extract_filename: Callable[[str], str] + cleanup_empty_directories: Callable + _get_stream_state: Callable[[], dict] + _set_stream_state: Callable[[dict], None] + + @property + def stream_state(self) -> dict: + return self._get_stream_state() + + @stream_state.setter + def stream_state(self, value: dict) -> None: + self._set_stream_state(value) + + +def prepare_stream_task(track_data, deps: PrepareStreamDeps): + """ + Background streaming task that downloads track to Stream folder and updates global state. + Enhanced version with robust error handling matching the GUI StreamingThread. + """ + loop = None + queue_start_time = None + actively_downloading = False + last_progress_sent = 0.0 + + try: + logger.info(f"Starting stream preparation for: {track_data.get('filename')}") + + # Update state to loading + with deps.stream_lock: + deps.stream_state.update({ + "status": "loading", + "progress": 0, + "track_info": track_data, + "file_path": None, + "error_message": None + }) + + # Get paths + download_path = deps.docker_resolve_path(deps.config_manager.get('soulseek.download_path', './downloads')) + project_root = deps.project_root + stream_folder = os.path.join(project_root, 'Stream') + + # Ensure Stream directory exists + os.makedirs(stream_folder, exist_ok=True) + + # Clear any existing files in Stream folder (only one file at a time) + for existing_file in glob.glob(os.path.join(stream_folder, '*')): + try: + if os.path.isfile(existing_file): + os.remove(existing_file) + elif os.path.isdir(existing_file): + shutil.rmtree(existing_file) + logger.info(f"Cleared old stream file: {existing_file}") + except Exception as e: + logger.error(f"Could not remove existing stream file: {e}") + + # Start the download using the same mechanism as regular downloads + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + download_result = loop.run_until_complete(deps.soulseek_client.download( + track_data.get('username'), + track_data.get('filename'), + track_data.get('size', 0) + )) + + if not download_result: + with deps.stream_lock: + deps.stream_state.update({ + "status": "error", + "error_message": "Failed to initiate download - uploader may be offline" + }) + return + + logger.info("Download initiated for streaming") + + # Enhanced monitoring with queue timeout detection (matching GUI) + max_wait_time = 60 # Increased timeout + poll_interval = 1.5 # More frequent polling + queue_timeout = 15 # Queue timeout like GUI + wait_count = 0 + + while wait_count * poll_interval < max_wait_time: + wait_count += 1 + + # Check download progress via orchestrator (works for Soulseek and YouTube) + api_progress = None + download_state = None + download_status = None + + try: + # Use orchestrator's get_all_downloads() which works for both sources + all_downloads = loop.run_until_complete(deps.soulseek_client.get_all_downloads()) + download_status = deps.find_streaming_download_in_all_downloads(all_downloads, track_data) + + if download_status: + api_progress = download_status.get('percentComplete', 0) + download_state = download_status.get('state', '').lower() + original_state = download_status.get('state', '') + + logger.info(f"API Download - State: {original_state}, Progress: {api_progress:.1f}%") + + # Track queue state timing (matching GUI logic) + is_queued = ('queued' in download_state or 'initializing' in download_state) + is_downloading = ('inprogress' in download_state or 'transferring' in download_state) + # Verify bytes match before trusting state/progress + _stream_expected = download_status.get('size', 0) + _stream_transferred = download_status.get('bytesTransferred', 0) + _bytes_ok = _stream_expected <= 0 or _stream_transferred >= _stream_expected + is_completed = ('succeeded' in download_state or api_progress >= 100) and _bytes_ok + + # Handle queue state timing + if is_queued and queue_start_time is None: + queue_start_time = time.time() + logger.info(f"Download entered queue state: {original_state}") + with deps.stream_lock: + deps.stream_state["status"] = "queued" + elif is_downloading and not actively_downloading: + actively_downloading = True + queue_start_time = None # Reset queue timer + logger.info(f"Download started actively downloading: {original_state}") + with deps.stream_lock: + deps.stream_state["status"] = "loading" + + # Check for queue timeout (matching GUI) + if is_queued and queue_start_time: + queue_elapsed = time.time() - queue_start_time + if queue_elapsed > queue_timeout: + logger.error(f"⏰ Queue timeout after {queue_elapsed:.1f}s - download stuck in queue") + with deps.stream_lock: + deps.stream_state.update({ + "status": "error", + "error_message": "Queue timeout - uploader not responding. Try another source." + }) + return + + # Update progress + with deps.stream_lock: + if api_progress != last_progress_sent: + deps.stream_state["progress"] = api_progress + last_progress_sent = api_progress + + # Check if download is complete + if is_completed: + logger.info(f"Download completed via API status: {original_state}") + + # Wait for file to stabilise on disk before moving + found_file = deps.find_downloaded_file(download_path, track_data) + if found_file: + _prev_sz = -1 + for _sc in range(4): + try: + _cur_sz = os.path.getsize(found_file) + except OSError: + _cur_sz = -1 + if _cur_sz == _prev_sz and _cur_sz > 0: + break + _prev_sz = _cur_sz + time.sleep(1.5) + + # Re-find in case it wasn't found on first try + if not found_file: + found_file = deps.find_downloaded_file(download_path, track_data) + + # Retry file search a few times (matching GUI logic) + retry_attempts = 5 + for attempt in range(retry_attempts): + if found_file: + break + logger.warning(f"File not found yet, attempt {attempt + 1}/{retry_attempts}") + time.sleep(1) + found_file = deps.find_downloaded_file(download_path, track_data) + + if found_file: + logger.debug(f"Found downloaded file: {found_file}") + + # Move file to Stream folder + original_filename = deps.extract_filename(found_file) + stream_path = os.path.join(stream_folder, original_filename) + + try: + shutil.move(found_file, stream_path) + logger.debug(f"Moved file to stream folder: {stream_path}") + + # Clean up empty directories (matching GUI) + deps.cleanup_empty_directories(download_path, found_file) + + # Update state to ready + with deps.stream_lock: + deps.stream_state.update({ + "status": "ready", + "progress": 100, + "file_path": stream_path + }) + + # Clean up download from slskd API + try: + download_id = download_status.get('id', '') + if download_id and track_data.get('username'): + success = loop.run_until_complete( + deps.soulseek_client.signal_download_completion( + download_id, track_data.get('username'), remove=True) + ) + if success: + logger.debug(f"Cleaned up download {download_id} from API") + except Exception as e: + logger.error(f"Error cleaning up download: {e}") + + logger.info(f"Stream file ready for playback: {stream_path}") + return # Success! + + except Exception as e: + logger.error(f"Error moving file to stream folder: {e}") + with deps.stream_lock: + deps.stream_state.update({ + "status": "error", + "error_message": f"Failed to prepare stream file: {e}" + }) + return + else: + logger.error("Could not find downloaded file after completion") + with deps.stream_lock: + deps.stream_state.update({ + "status": "error", + "error_message": "Download completed but file not found" + }) + return + else: + # No transfer found in API - may still be initializing + logger.debug(f"No transfer found in API yet... (elapsed: {wait_count * poll_interval}s)") + + except Exception as e: + logger.error(f"Error checking download progress: {e}") + # Continue to next iteration if API call fails + + # Wait before next poll + time.sleep(poll_interval) + + # If we get here, download timed out + logger.warning(f"Download timed out after {max_wait_time}s") + with deps.stream_lock: + deps.stream_state.update({ + "status": "error", + "error_message": "Download timed out - try a different source" + }) + + except asyncio.CancelledError: + logger.warning("Stream task cancelled") + with deps.stream_lock: + deps.stream_state.update({ + "status": "stopped", + "error_message": None + }) + finally: + if loop: + try: + # Clean up any pending tasks + pending = asyncio.all_tasks(loop) + if pending: + loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) + loop.close() + except Exception as e: + logger.error(f"Error cleaning up streaming event loop: {e}") + + except Exception as e: + logger.error(f"Stream preparation failed: {e}") + with deps.stream_lock: + deps.stream_state.update({ + "status": "error", + "error_message": f"Streaming error: {str(e)}" + }) + diff --git a/tests/playlists/__init__.py b/tests/playlists/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/playlists/test_explorer.py b/tests/playlists/test_explorer.py new file mode 100644 index 00000000..463140e0 --- /dev/null +++ b/tests/playlists/test_explorer.py @@ -0,0 +1,327 @@ +"""Tests for core/playlists/explorer.py — playlist explorer build-tree route.""" + +from __future__ import annotations + +import json +from dataclasses import dataclass + +import pytest + +from core.playlists import explorer as ex + + +# --------------------------------------------------------------------------- +# Fakes +# --------------------------------------------------------------------------- + +class _FakeRequest: + def __init__(self, payload): + self._payload = payload + + def get_json(self): + return self._payload + + +class _FakeResponse: + """Captures the streaming generator + headers.""" + def __init__(self, generator, mimetype=None, headers=None): + self.generator = generator + self.mimetype = mimetype + self.headers = headers or {} + self.body_lines = list(generator) + + +def _fake_jsonify(payload): + """Returns the payload dict as-is — the wrapper does the actual jsonify.""" + return payload + + +@dataclass +class _FakeAlbum: + id: str + name: str + release_date: str = '2024-01-01' + image_url: str = '' + total_tracks: int = 10 + album_type: str = 'album' + artist_ids: list = None + + +class _FakeArtistMeta: + def __init__(self, name='Artist', id='a-1'): + self.name = name + self.id = id + self.image_url = 'http://art-img' + + +class _FakeSpotify: + def __init__(self, *, authenticated=True, search_results=None, albums=None): + self._authenticated = authenticated + self._search_results = search_results or [] + self._albums = albums or [] + self.sp = object() # pretends to be spotify (skip_cache support) + + def is_spotify_authenticated(self): + return self._authenticated + + def search_artists(self, name, limit=5): + return self._search_results + + def get_artist_albums(self, artist_id, album_type='album,single', skip_cache=False): + return self._albums + + def get_artist(self, artist_id): + return None + + +class _FakeCursor: + def __init__(self): + self.queries = [] + + def execute(self, sql, params=None): + self.queries.append((sql, params)) + + def fetchall(self): + return [] + + +class _FakeConn: + def __init__(self): + self.cur = _FakeCursor() + + def cursor(self): + return self.cur + + def __enter__(self): + return self + + def __exit__(self, *args): + return False + + +class _FakeDB: + def __init__(self, playlist=None, tracks=None): + self._playlist = playlist + self._tracks = tracks or [] + self.marked_explored = False + + def get_mirrored_playlist(self, pid): + return self._playlist + + def get_mirrored_playlist_tracks(self, pid): + return self._tracks + + def mark_mirrored_playlist_explored(self, pid): + self.marked_explored = True + + def _get_connection(self): + return _FakeConn() + + +class _FakeCache: + def __init__(self): + self.entries = {} + + def get_entity(self, source, kind, key): + return self.entries.get((source, kind, key)) + + def store_entity(self, source, kind, key, value): + self.entries[(source, kind, key)] = value + + +def _build_deps( + *, + request_payload=None, + spotify=None, + db=None, + discovery_source='spotify', + fallback_source='itunes', + cache=None, +): + deps = ex.PlaylistExplorerDeps( + request=_FakeRequest(request_payload or {}), + flask_response=_FakeResponse, + flask_jsonify=_fake_jsonify, + spotify_client=spotify or _FakeSpotify(), + get_database=lambda: db or _FakeDB(), + get_active_discovery_source=lambda: discovery_source, + get_metadata_fallback_client=lambda: spotify or _FakeSpotify(), + get_metadata_fallback_source=lambda: fallback_source, + get_metadata_cache=lambda: cache or _FakeCache(), + ) + return deps + + +# --------------------------------------------------------------------------- +# Validation early exits +# --------------------------------------------------------------------------- + +def test_no_data_returns_400(): + """Empty/None request body → 400.""" + deps = _build_deps(request_payload=None) + deps.request = _FakeRequest(None) + result = ex.playlist_explorer_build_tree(deps) + payload, status = result + assert status == 400 + assert payload == {"success": False, "error": "No data provided"} + + +def test_missing_playlist_id_returns_400(): + """Payload without playlist_id → 400.""" + deps = _build_deps(request_payload={'mode': 'albums'}) + payload, status = ex.playlist_explorer_build_tree(deps) + assert status == 400 + assert 'playlist_id' in payload['error'] + + +def test_invalid_mode_returns_400(): + """mode != 'albums'/'discographies' → 400.""" + deps = _build_deps(request_payload={'playlist_id': '1', 'mode': 'invalid'}) + payload, status = ex.playlist_explorer_build_tree(deps) + assert status == 400 + assert "'albums' or 'discographies'" in payload['error'] + + +def test_playlist_not_found_returns_404(): + """Database returns no playlist for the given ID → 404.""" + db = _FakeDB(playlist=None) + deps = _build_deps(request_payload={'playlist_id': '99'}, db=db) + payload, status = ex.playlist_explorer_build_tree(deps) + assert status == 404 + + +def test_playlist_with_no_tracks_returns_400(): + """Playlist found but has no tracks → 400.""" + db = _FakeDB(playlist={'name': 'P', 'image_url': ''}, tracks=[]) + deps = _build_deps(request_payload={'playlist_id': '1'}, db=db) + payload, status = ex.playlist_explorer_build_tree(deps) + assert status == 400 + + +# --------------------------------------------------------------------------- +# Streaming response +# --------------------------------------------------------------------------- + +def test_success_returns_streaming_response_with_meta_line(): + """Successful build → Response wrapper with NDJSON generator that starts with 'meta'.""" + db = _FakeDB( + playlist={'name': 'My Playlist', 'image_url': 'http://img'}, + tracks=[{'track_name': 'T1', 'artist_name': 'Artist One', 'album_name': 'Album X', 'extra_data': None}], + ) + spotify = _FakeSpotify( + search_results=[_FakeArtistMeta(name='Artist One', id='a-1')], + albums=[_FakeAlbum(id='alb-1', name='Album X', release_date='2024')], + ) + deps = _build_deps( + request_payload={'playlist_id': '1', 'mode': 'discographies'}, + spotify=spotify, + db=db, + ) + + response = ex.playlist_explorer_build_tree(deps) + + # _FakeResponse exposes body_lines pre-collected from the generator + assert response.mimetype == 'application/x-ndjson' + lines = response.body_lines + assert len(lines) >= 2 + + # First line should be meta + first = json.loads(lines[0]) + assert first['type'] == 'meta' + assert first['playlist_name'] == 'My Playlist' + assert first['source'] == 'spotify' + + # Last line should be 'complete' + last = json.loads(lines[-1]) + assert last['type'] == 'complete' + + +def test_marks_playlist_explored_at_end_of_stream(): + """When the streaming generator runs to completion, mark_mirrored_playlist_explored fires.""" + db = _FakeDB( + playlist={'name': 'P', 'image_url': ''}, + tracks=[{'track_name': 'T', 'artist_name': 'A', 'album_name': 'B', 'extra_data': None}], + ) + spotify = _FakeSpotify(search_results=[_FakeArtistMeta(name='A')]) + deps = _build_deps( + request_payload={'playlist_id': '1', 'mode': 'discographies'}, + spotify=spotify, + db=db, + ) + + ex.playlist_explorer_build_tree(deps) + + assert db.marked_explored is True + + +# --------------------------------------------------------------------------- +# Discovered-track grouping +# --------------------------------------------------------------------------- + +def test_discovered_artist_grouping_uses_matched_data(): + """Tracks with matching-source extra_data → use matched_data['artists'][0].""" + db = _FakeDB( + playlist={'name': 'P', 'image_url': ''}, + tracks=[ + { + 'track_name': 'T', + 'artist_name': 'Local Artist Name', # raw + 'album_name': 'Local Album', + 'extra_data': json.dumps({ + 'discovered': True, + 'provider': 'spotify', + 'matched_data': { + 'artists': [{'name': 'Discovered Artist', 'id': 'sp-aid'}], + 'album': {'name': 'Discovered Album'}, + }, + }), + }, + ], + ) + spotify = _FakeSpotify( + search_results=[_FakeArtistMeta(name='Discovered Artist')], + albums=[_FakeAlbum(id='alb-1', name='Discovered Album', release_date='2024')], + ) + deps = _build_deps( + request_payload={'playlist_id': '1', 'mode': 'discographies'}, + spotify=spotify, db=db, + ) + + response = ex.playlist_explorer_build_tree(deps) + + artist_lines = [json.loads(line) for line in response.body_lines if json.loads(line).get('type') == 'artist'] + assert len(artist_lines) == 1 + assert artist_lines[0]['name'] == 'Discovered Artist' + assert artist_lines[0]['artist_id'] == 'sp-aid' + + +def test_provider_mismatch_falls_back_to_raw_track_name(): + """If discovered provider != active source, ignore matched_data, use raw artist_name.""" + db = _FakeDB( + playlist={'name': 'P', 'image_url': ''}, + tracks=[ + { + 'track_name': 'T', + 'artist_name': 'Raw Artist', + 'album_name': 'Raw Album', + 'extra_data': json.dumps({ + 'discovered': True, + 'provider': 'itunes', # mismatch + 'matched_data': { + 'artists': [{'name': 'iTunes Artist'}], + }, + }), + }, + ], + ) + # Active source is spotify (default) + spotify = _FakeSpotify(search_results=[_FakeArtistMeta(name='Raw Artist')]) + deps = _build_deps( + request_payload={'playlist_id': '1', 'mode': 'discographies'}, + spotify=spotify, db=db, + ) + + response = ex.playlist_explorer_build_tree(deps) + + artist_lines = [json.loads(line) for line in response.body_lines if json.loads(line).get('type') == 'artist'] + assert artist_lines[0]['name'] == 'Raw Artist' # NOT 'iTunes Artist' diff --git a/tests/streaming/__init__.py b/tests/streaming/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/streaming/test_prepare.py b/tests/streaming/test_prepare.py new file mode 100644 index 00000000..2b7c541b --- /dev/null +++ b/tests/streaming/test_prepare.py @@ -0,0 +1,174 @@ +"""Tests for core/streaming/prepare.py — stream-prep worker.""" + +from __future__ import annotations + +import threading + +import pytest + +from core.streaming import prepare as sp + + +class _FakeSoulseek: + """Minimal soulseek_client stub for the stream-prep worker.""" + + def __init__(self, *, download_id='dl-1', all_downloads=None): + self._download_id = download_id + self._all_downloads = all_downloads if all_downloads is not None else [] + + async def download(self, username, filename, size): + return self._download_id + + async def get_all_downloads(self): + return self._all_downloads + + async def signal_download_completion(self, download_id, username, remove=True): + return True + + +def _build_deps( + *, + state=None, + soulseek=None, + project_root='/tmp/proj', + find_streaming_result=None, + find_downloaded_result=None, +): + state = state if state is not None else {} + deps = sp.PrepareStreamDeps( + config_manager=type('C', (), {'get': lambda self, k, d=None: d})(), + soulseek_client=soulseek or _FakeSoulseek(), + stream_lock=threading.Lock(), + project_root=project_root, + docker_resolve_path=lambda p: p, + find_streaming_download_in_all_downloads=lambda all_dl, td: find_streaming_result, + find_downloaded_file=lambda dl_path, td: find_downloaded_result, + extract_filename=lambda fp: __import__('os').path.basename(fp), + cleanup_empty_directories=lambda dl_path, found_file: None, + _get_stream_state=lambda: state, + _set_stream_state=lambda v: state.clear() or state.update(v), + ) + deps._state = state + return deps + + +# --------------------------------------------------------------------------- +# Initial state setup +# --------------------------------------------------------------------------- + +def test_state_starts_loading_with_track_info(tmp_path): + """First action sets state to 'loading' with the track_info.""" + sk = _FakeSoulseek(download_id=None) # forces an early "Failed to initiate" exit + deps = _build_deps(soulseek=sk, project_root=str(tmp_path)) + + track_data = {'username': 'u', 'filename': 'song.flac', 'size': 1000} + sp.prepare_stream_task(track_data, deps) + + # First mutation set status='loading', track_info=track_data + # Then early exit because download() returned None — state ends up 'error' + assert deps._state['status'] == 'error' + assert 'Failed to initiate' in deps._state['error_message'] + + +def test_stream_folder_created(tmp_path): + """Stream/ subfolder is created under project_root.""" + sk = _FakeSoulseek(download_id=None) + deps = _build_deps(soulseek=sk, project_root=str(tmp_path)) + + sp.prepare_stream_task({'username': 'u', 'filename': 'x', 'size': 0}, deps) + + assert (tmp_path / 'Stream').is_dir() + + +def test_stream_folder_cleared_before_download(tmp_path): + """Existing files in Stream/ are removed before each prepare.""" + stream_dir = tmp_path / 'Stream' + stream_dir.mkdir() + old_file = stream_dir / 'old.flac' + old_file.write_bytes(b'old data') + assert old_file.exists() + + sk = _FakeSoulseek(download_id=None) + deps = _build_deps(soulseek=sk, project_root=str(tmp_path)) + sp.prepare_stream_task({'username': 'u', 'filename': 'x', 'size': 0}, deps) + + # Old file gone (cleared at start of prep) + assert not old_file.exists() + + +# --------------------------------------------------------------------------- +# Download initiation failure +# --------------------------------------------------------------------------- + +def test_download_returns_none_marks_error(tmp_path): + """soulseek_client.download() returning None → state.error.""" + sk = _FakeSoulseek(download_id=None) + deps = _build_deps(soulseek=sk, project_root=str(tmp_path)) + + sp.prepare_stream_task({'username': 'u', 'filename': 'x', 'size': 0}, deps) + + assert deps._state['status'] == 'error' + + +# --------------------------------------------------------------------------- +# Successful completion +# --------------------------------------------------------------------------- + +def test_completed_download_moves_to_stream_and_marks_ready(tmp_path): + """When the polled status reports succeeded + bytes match, file moved + state ready.""" + download_path = tmp_path / 'downloads' + download_path.mkdir() + src_file = download_path / 'song.flac' + src_file.write_bytes(b'audio') + + download_status = { + 'id': 'dl-99', + 'state': 'Succeeded', + 'percentComplete': 100, + 'size': 5, + 'bytesTransferred': 5, + } + sk = _FakeSoulseek(download_id='dl-99', all_downloads=['stub']) + deps = _build_deps( + soulseek=sk, + project_root=str(tmp_path), + find_streaming_result=download_status, + find_downloaded_result=str(src_file), + ) + deps.config_manager = type('C', (), { + 'get': lambda self, k, d=None: str(download_path) if k == 'soulseek.download_path' else d, + })() + + sp.prepare_stream_task( + {'username': 'u', 'filename': 'song.flac', 'size': 5}, + deps, + ) + + assert deps._state['status'] == 'ready' + assert deps._state['progress'] == 100 + assert (tmp_path / 'Stream' / 'song.flac').exists() + assert deps._state['file_path'] == str(tmp_path / 'Stream' / 'song.flac') + + +def test_succeeded_state_with_partial_bytes_keeps_polling(tmp_path): + """If state is 'Succeeded' but bytes < size, marks _incomplete_warned and continues.""" + download_status = { + 'id': 'dl-99', + 'state': 'Succeeded', + 'percentComplete': 100, + 'size': 100, + 'bytesTransferred': 50, # incomplete + } + sk = _FakeSoulseek(download_id='dl-99', all_downloads=['stub']) + deps = _build_deps( + soulseek=sk, + project_root=str(tmp_path), + find_streaming_result=download_status, + ) + + # Force quick exit by capping the loop with no further state change + # Worker times out via max_wait_time in real code — we just verify state didn't go ready + sp.prepare_stream_task({'username': 'u', 'filename': 'x', 'size': 100}, deps) + + # Should NOT have gone to 'ready' because bytes were incomplete + assert deps._state['status'] != 'ready' diff --git a/web_server.py b/web_server.py index 824f3b84..af640c65 100644 --- a/web_server.py +++ b/web_server.py @@ -10,8 +10,6 @@ import subprocess import platform import threading import time -import shutil -import glob import uuid import re import sqlite3 @@ -3780,264 +3778,37 @@ _EDITION_BARE_RE = _re.compile( _re.IGNORECASE ) -def _prepare_stream_task(track_data): - """ - Background streaming task that downloads track to Stream folder and updates global state. - Enhanced version with robust error handling matching the GUI StreamingThread. - """ - loop = None - queue_start_time = None - actively_downloading = False - last_progress_sent = 0.0 - - try: - logger.info(f"Starting stream preparation for: {track_data.get('filename')}") - - # Update state to loading - with stream_lock: - stream_state.update({ - "status": "loading", - "progress": 0, - "track_info": track_data, - "file_path": None, - "error_message": None - }) - - # Get paths - download_path = docker_resolve_path(config_manager.get('soulseek.download_path', './downloads')) - project_root = os.path.dirname(os.path.abspath(__file__)) - stream_folder = os.path.join(project_root, 'Stream') - - # Ensure Stream directory exists - os.makedirs(stream_folder, exist_ok=True) - - # Clear any existing files in Stream folder (only one file at a time) - for existing_file in glob.glob(os.path.join(stream_folder, '*')): - try: - if os.path.isfile(existing_file): - os.remove(existing_file) - elif os.path.isdir(existing_file): - shutil.rmtree(existing_file) - logger.info(f"Cleared old stream file: {existing_file}") - except Exception as e: - logger.error(f"Could not remove existing stream file: {e}") - - # Start the download using the same mechanism as regular downloads - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - try: - download_result = loop.run_until_complete(soulseek_client.download( - track_data.get('username'), - track_data.get('filename'), - track_data.get('size', 0) - )) - - if not download_result: - with stream_lock: - stream_state.update({ - "status": "error", - "error_message": "Failed to initiate download - uploader may be offline" - }) - return - - logger.info("Download initiated for streaming") - - # Enhanced monitoring with queue timeout detection (matching GUI) - max_wait_time = 60 # Increased timeout - poll_interval = 1.5 # More frequent polling - queue_timeout = 15 # Queue timeout like GUI - wait_count = 0 - - while wait_count * poll_interval < max_wait_time: - wait_count += 1 - - # Check download progress via orchestrator (works for Soulseek and YouTube) - api_progress = None - download_state = None - download_status = None +# Stream-prep worker logic lives in core/streaming/prepare.py. +from core.streaming import prepare as _streaming_prepare - try: - # Use orchestrator's get_all_downloads() which works for both sources - all_downloads = loop.run_until_complete(soulseek_client.get_all_downloads()) - download_status = _find_streaming_download_in_all_downloads(all_downloads, track_data) - - if download_status: - api_progress = download_status.get('percentComplete', 0) - download_state = download_status.get('state', '').lower() - original_state = download_status.get('state', '') - - logger.info(f"API Download - State: {original_state}, Progress: {api_progress:.1f}%") - - # Track queue state timing (matching GUI logic) - is_queued = ('queued' in download_state or 'initializing' in download_state) - is_downloading = ('inprogress' in download_state or 'transferring' in download_state) - # Verify bytes match before trusting state/progress - _stream_expected = download_status.get('size', 0) - _stream_transferred = download_status.get('bytesTransferred', 0) - _bytes_ok = _stream_expected <= 0 or _stream_transferred >= _stream_expected - is_completed = ('succeeded' in download_state or api_progress >= 100) and _bytes_ok - - # Handle queue state timing - if is_queued and queue_start_time is None: - queue_start_time = time.time() - logger.info(f"Download entered queue state: {original_state}") - with stream_lock: - stream_state["status"] = "queued" - elif is_downloading and not actively_downloading: - actively_downloading = True - queue_start_time = None # Reset queue timer - logger.info(f"Download started actively downloading: {original_state}") - with stream_lock: - stream_state["status"] = "loading" - - # Check for queue timeout (matching GUI) - if is_queued and queue_start_time: - queue_elapsed = time.time() - queue_start_time - if queue_elapsed > queue_timeout: - logger.error(f"⏰ Queue timeout after {queue_elapsed:.1f}s - download stuck in queue") - with stream_lock: - stream_state.update({ - "status": "error", - "error_message": "Queue timeout - uploader not responding. Try another source." - }) - return - - # Update progress - with stream_lock: - if api_progress != last_progress_sent: - stream_state["progress"] = api_progress - last_progress_sent = api_progress - - # Check if download is complete - if is_completed: - logger.info(f"Download completed via API status: {original_state}") - - # Wait for file to stabilise on disk before moving - found_file = _find_downloaded_file(download_path, track_data) - if found_file: - _prev_sz = -1 - for _sc in range(4): - try: - _cur_sz = os.path.getsize(found_file) - except OSError: - _cur_sz = -1 - if _cur_sz == _prev_sz and _cur_sz > 0: - break - _prev_sz = _cur_sz - time.sleep(1.5) - # Re-find in case it wasn't found on first try - if not found_file: - found_file = _find_downloaded_file(download_path, track_data) - - # Retry file search a few times (matching GUI logic) - retry_attempts = 5 - for attempt in range(retry_attempts): - if found_file: - break - logger.warning(f"File not found yet, attempt {attempt + 1}/{retry_attempts}") - time.sleep(1) - found_file = _find_downloaded_file(download_path, track_data) - - if found_file: - logger.debug(f"Found downloaded file: {found_file}") - - # Move file to Stream folder - original_filename = extract_filename(found_file) - stream_path = os.path.join(stream_folder, original_filename) - - try: - shutil.move(found_file, stream_path) - logger.debug(f"Moved file to stream folder: {stream_path}") - - # Clean up empty directories (matching GUI) - _cleanup_empty_directories(download_path, found_file) - - # Update state to ready - with stream_lock: - stream_state.update({ - "status": "ready", - "progress": 100, - "file_path": stream_path - }) - - # Clean up download from slskd API - try: - download_id = download_status.get('id', '') - if download_id and track_data.get('username'): - success = loop.run_until_complete( - soulseek_client.signal_download_completion( - download_id, track_data.get('username'), remove=True) - ) - if success: - logger.debug(f"Cleaned up download {download_id} from API") - except Exception as e: - logger.error(f"Error cleaning up download: {e}") - - logger.info(f"Stream file ready for playback: {stream_path}") - return # Success! - - except Exception as e: - logger.error(f"Error moving file to stream folder: {e}") - with stream_lock: - stream_state.update({ - "status": "error", - "error_message": f"Failed to prepare stream file: {e}" - }) - return - else: - logger.error("Could not find downloaded file after completion") - with stream_lock: - stream_state.update({ - "status": "error", - "error_message": "Download completed but file not found" - }) - return - else: - # No transfer found in API - may still be initializing - logger.debug(f"No transfer found in API yet... (elapsed: {wait_count * poll_interval}s)") - - except Exception as e: - logger.error(f"Error checking download progress: {e}") - # Continue to next iteration if API call fails - - # Wait before next poll - time.sleep(poll_interval) - - # If we get here, download timed out - logger.warning(f"Download timed out after {max_wait_time}s") - with stream_lock: - stream_state.update({ - "status": "error", - "error_message": "Download timed out - try a different source" - }) - - except asyncio.CancelledError: - logger.warning("Stream task cancelled") - with stream_lock: - stream_state.update({ - "status": "stopped", - "error_message": None - }) - finally: - if loop: - try: - # Clean up any pending tasks - pending = asyncio.all_tasks(loop) - if pending: - loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) - loop.close() - except Exception as e: - logger.error(f"Error cleaning up streaming event loop: {e}") - - except Exception as e: - logger.error(f"Stream preparation failed: {e}") - with stream_lock: - stream_state.update({ - "status": "error", - "error_message": f"Streaming error: {str(e)}" - }) +def _build_prepare_stream_deps(): + """Build the PrepareStreamDeps bundle from web_server.py globals on each call.""" + def _get_stream_state(): + return stream_state + + def _set_stream_state(value): + global stream_state + stream_state = value + + return _streaming_prepare.PrepareStreamDeps( + config_manager=config_manager, + soulseek_client=soulseek_client, + stream_lock=stream_lock, + project_root=os.path.dirname(os.path.abspath(__file__)), + docker_resolve_path=docker_resolve_path, + find_streaming_download_in_all_downloads=_find_streaming_download_in_all_downloads, + find_downloaded_file=_find_downloaded_file, + extract_filename=extract_filename, + cleanup_empty_directories=_cleanup_empty_directories, + _get_stream_state=_get_stream_state, + _set_stream_state=_set_stream_state, + ) + + +def _prepare_stream_task(track_data): + return _streaming_prepare.prepare_stream_task(track_data, _build_prepare_stream_deps()) + def _find_streaming_download_in_all_downloads(all_downloads, track_data): """ @@ -36056,310 +35827,29 @@ def get_mirrored_discovery_states(): # PLAYLIST EXPLORER # ================================================================================================ -@app.route('/api/playlist-explorer/build-tree', methods=['POST']) -def playlist_explorer_build_tree(): - """Build a discovery tree from a mirrored playlist. - Streams NDJSON: one line per artist with their albums. - Works with Spotify, iTunes, or Deezer as the metadata source. - Uses and populates the metadata cache to avoid redundant API calls.""" - try: - data = request.get_json() - if not data: - return jsonify({"success": False, "error": "No data provided"}), 400 - - playlist_id = data.get('playlist_id') - mode = data.get('mode', 'albums') # 'albums' or 'discographies' - - if not playlist_id: - return jsonify({"success": False, "error": "playlist_id is required"}), 400 - if mode not in ('albums', 'discographies'): - return jsonify({"success": False, "error": "mode must be 'albums' or 'discographies'"}), 400 - - database = get_database() - playlist = database.get_mirrored_playlist(playlist_id) - if not playlist: - return jsonify({"success": False, "error": "Playlist not found"}), 404 - - tracks = database.get_mirrored_playlist_tracks(playlist_id) - if not tracks: - return jsonify({"success": False, "error": "Playlist has no tracks"}), 400 - - # Determine active metadata source — respect user's configured primary - source_name = _get_active_discovery_source() - if source_name == 'spotify' and spotify_client and spotify_client.is_spotify_authenticated(): - active_client = spotify_client - else: - active_client = _get_metadata_fallback_client() - source_name = _get_metadata_fallback_source() - - cache = get_metadata_cache() - - # Parse extra_data and group tracks by artist using discovered data - artist_groups = {} - for t in tracks: - extra = {} - if t.get('extra_data'): - try: - extra = json.loads(t['extra_data']) if isinstance(t['extra_data'], str) else t['extra_data'] - except (json.JSONDecodeError, TypeError): - pass - - # Only use discovery data if it matches the active metadata source - is_discovered = extra.get('discovered', False) - provider = (extra.get('provider') or '').lower() - source_matches = provider == source_name or (provider in ('itunes', 'apple') and source_name == 'itunes') - - matched = extra.get('matched_data', {}) if (is_discovered and source_matches) else {} - artists_list = matched.get('artists', []) - primary_artist = artists_list[0] if artists_list else None - # Artists can be dicts {"name": "X", "id": "Y"} or plain strings "X" - if isinstance(primary_artist, dict): - artist_name = primary_artist.get('name') or (t.get('artist_name') or '').strip() - artist_id = primary_artist.get('id') or None - elif isinstance(primary_artist, str): - artist_name = primary_artist or (t.get('artist_name') or '').strip() - artist_id = None - else: - artist_name = (t.get('artist_name') or '').strip() - artist_id = None - - if not artist_name: - continue - - key = artist_name.lower() - if key not in artist_groups: - artist_groups[key] = { - 'name': artist_name, - 'artist_id': artist_id, # Pre-resolved from discovery - 'tracks': [], - 'album_names': set(), - 'discovered': extra.get('discovered', False), - } - # If we get an artist_id from a later track but didn't have one before, fill it in - if artist_id and not artist_groups[key].get('artist_id'): - artist_groups[key]['artist_id'] = artist_id - - artist_groups[key]['tracks'].append(t.get('track_name', '')) - # Get album name from discovered data or playlist field - album_name = '' - album_data = matched.get('album') - if isinstance(album_data, dict) and album_data.get('name'): - album_name = album_data['name'] - elif (t.get('album_name') or '').strip(): - album_name = t['album_name'].strip() - if album_name: - artist_groups[key]['album_names'].add(album_name) - - def _normalize_for_match(title): - import re - return re.sub(r'\s*[\(\[][^)\]]*[\)\]]', '', title).strip().lower() - - def _fetch_artist_discography(artist_name, known_artist_id=None): - """Fetch discography using the active client. Checks cache first, stores results after. - If known_artist_id is provided (from discovery cache), skips the name search.""" - # Check cache for this artist's discography - cache_key = f"explorer_disco_{artist_name.lower().strip()}" - cached = cache.get_entity(source_name, 'artist_discography', cache_key) if cache else None - if cached and isinstance(cached, dict) and cached.get('albums'): - logger.debug(f"Explorer: cache hit for '{artist_name}' discography") - return cached - - artist_id = known_artist_id - artist_image = None - - if artist_id: - # Already have the ID from discovery — just fetch the artist image - try: - artist_info = active_client.get_artist(artist_id) - if artist_info: - if isinstance(artist_info, dict): - images = artist_info.get('images') or [] - artist_image = images[0].get('url') if images else None - elif hasattr(artist_info, 'image_url'): - artist_image = artist_info.image_url - except Exception: - pass - else: - # No pre-resolved ID — search by name - try: - search_results = active_client.search_artists(artist_name, limit=5) - except Exception as e: - return {'success': False, 'error': f'Search failed: {e}'} - - if not search_results: - return {'success': False, 'error': f'"{artist_name}" not found'} - - # Find best match (exact first, then fuzzy) - best = None - for a in search_results: - if a.name.lower().strip() == artist_name.lower().strip(): - best = a - break - if not best: - best = search_results[0] - - artist_id = best.id - artist_image = best.image_url if hasattr(best, 'image_url') else None - - # Fetch albums - try: - # skip_cache only supported by spotify_client — other clients don't cache this call - _skip = {'skip_cache': True} if hasattr(active_client, 'sp') else {} - all_albums = active_client.get_artist_albums(artist_id, album_type='album,single', **_skip) - except Exception as e: - return {'success': False, 'error': f'Album fetch failed: {e}'} - - if not all_albums: - return {'success': False, 'error': 'No albums found'} - - # Check which albums the user already owns - owned_titles = set() - try: - db = get_database() - with db._get_connection() as conn: - cursor = conn.cursor() - # Find all artists in DB matching this name - cursor.execute("SELECT id FROM artists WHERE LOWER(name) = LOWER(?)", (artist_name,)) - artist_rows = cursor.fetchall() - for ar in artist_rows: - cursor.execute("SELECT title FROM albums WHERE artist_id = ?", (ar['id'],)) - for alb_row in cursor.fetchall(): - owned_titles.add((alb_row['title'] or '').strip().lower()) - except Exception: - pass # Non-critical — owned badges just won't show - - # Build release list - releases = [] - for album in all_albums: - # Skip albums where this artist isn't primary - if hasattr(album, 'artist_ids') and album.artist_ids and album.artist_ids[0] != artist_id: - continue - releases.append({ - 'title': album.name, - 'year': album.release_date[:4] if album.release_date else None, - 'image_url': album.image_url, - 'spotify_id': album.id, - 'track_count': album.total_tracks, - 'album_type': (album.album_type or 'album').lower(), - 'owned': (album.name or '').strip().lower() in owned_titles, - }) - - result = { - 'success': True, - 'name': artist_name, # Required for metadata cache validation - 'albums': releases, - 'artist_image': artist_image, - 'artist_id': artist_id, - 'artist_name': artist_name, - } - - # Store in cache - if cache and releases: - try: - cache.store_entity(source_name, 'artist_discography', cache_key, result) - except Exception: - pass - - return result - - def generate(): - yield json.dumps({ - "type": "meta", - "playlist_name": playlist.get('name', 'Unknown Playlist'), - "playlist_image": playlist.get('image_url', ''), - "total_artists": len(artist_groups), - "total_tracks": len(tracks), - "source": source_name, - }) + '\n' - - total_albums = 0 - - for idx, (_key, group) in enumerate(artist_groups.items()): - artist_name = group['name'] - playlist_track_names = group['tracks'] - playlist_album_names = group['album_names'] - - try: - disco = _fetch_artist_discography(artist_name, group.get('artist_id')) - - if not disco.get('success'): - yield json.dumps({ - "type": "artist", - "name": artist_name, - "artist_id": None, - "image_url": None, - "playlist_tracks": playlist_track_names, - "albums": [], - "error": disco.get('error', 'Not found'), - }) + '\n' - time.sleep(0.1) - continue - - # Tag each release with in_playlist flag - # If no album names available, fall back to matching track names against single titles - match_names = playlist_album_names - if not match_names: - match_names = set(playlist_track_names) - - all_releases = [] - for release in disco.get('albums', []): - r = dict(release) - norm_title = _normalize_for_match(r['title']) - r['in_playlist'] = any( - _normalize_for_match(a) == norm_title or - norm_title in _normalize_for_match(a) or - _normalize_for_match(a) in norm_title - for a in match_names - ) - all_releases.append(r) - - # Filter based on mode - if mode == 'albums': - filtered = [r for r in all_releases if r['in_playlist']] - else: - filtered = all_releases +# Playlist explorer build-tree route lives in core/playlists/explorer.py. +from core.playlists import explorer as _playlists_explorer - filtered.sort(key=lambda r: (not r.get('in_playlist', False), -(int(r.get('year') or 0)))) - total_albums += len(filtered) - yield json.dumps({ - "type": "artist", - "name": disco.get('artist_name', artist_name), - "artist_id": disco.get('artist_id'), - "image_url": disco.get('artist_image'), - "playlist_tracks": playlist_track_names, - "albums": filtered, - }) + '\n' - - except Exception as e: - logger.error(f"Explorer: error processing artist '{artist_name}': {e}") - yield json.dumps({ - "type": "artist", - "name": artist_name, - "artist_id": None, - "image_url": None, - "playlist_tracks": playlist_track_names, - "albums": [], - "error": str(e), - }) + '\n' - - # Rate limit protection between artists - if idx < len(artist_groups) - 1: - time.sleep(0.2) +def _build_playlist_explorer_deps(): + """Build the PlaylistExplorerDeps bundle from web_server.py globals on each call.""" + return _playlists_explorer.PlaylistExplorerDeps( + request=request, + flask_response=Response, + flask_jsonify=jsonify, + spotify_client=spotify_client, + get_database=get_database, + get_active_discovery_source=_get_active_discovery_source, + get_metadata_fallback_client=_get_metadata_fallback_client, + get_metadata_fallback_source=_get_metadata_fallback_source, + get_metadata_cache=get_metadata_cache, + ) - get_database().mark_mirrored_playlist_explored(playlist_id) - yield json.dumps({"type": "complete", "total_artists": len(artist_groups), "total_albums": total_albums}) + '\n' - return Response(generate(), mimetype='application/x-ndjson', headers={ - 'Cache-Control': 'no-cache', - 'X-Accel-Buffering': 'no', - }) +@app.route('/api/playlist-explorer/build-tree', methods=['POST']) +def playlist_explorer_build_tree(): + return _playlists_explorer.playlist_explorer_build_tree(_build_playlist_explorer_deps()) - except Exception as e: - logger.error(f"Playlist Explorer build-tree error: {e}") - import traceback - traceback.print_exc() - return jsonify({"success": False, "error": str(e)}), 500 @app.route('/api/playlist-explorer/album-tracks/', methods=['GET'])