Lift _prepare_stream_task + playlist_explorer_build_tree to core/

Final lift in the web_server.py extraction effort. Pulls two route
handlers + one background worker out of `web_server.py` into new
focused packages:

- `core/streaming/prepare.py` — 258-line stream-prep worker that
  downloads a track to the local Stream/ folder for the browser audio
  player.
- `core/playlists/explorer.py` — 305-line route handler for
  `POST /api/playlist-explorer/build-tree` that streams an NDJSON
  discography tree from a mirrored playlist.

What `prepare_stream_task` does:

1. Reset stream state to 'loading' with the new track info.
2. Clear any prior file from Stream/ (only one stream lives there).
3. Spin up a fresh asyncio event loop and `soulseek_client.download()`.
4. Poll progress every 1.5s. Queue timeout 15s; overall 60s.
5. On succeeded + bytes-match: find the file with retry, move into
   Stream/, signal slskd completion, mark state 'ready' with file_path.
6. On error/timeout/cancel: state goes to 'error' or 'stopped'.
7. Finally: tear down the event loop cleanly.

What `playlist_explorer_build_tree` does:

1. Validate request, load playlist + tracks from DB.
2. Pick active metadata source (Spotify if authed, else fallback).
3. Group tracks by artist using discovered matched_data when the
   provider matches the active source.
4. Stream NDJSON: meta line → one artist line per group → complete line.
5. Per artist: cache check → resolve discography → tag releases with
   `in_playlist` flag based on title-similarity match → filter by mode
   (`albums` = only matches; `discographies` = full disco).
6. Mark playlist as explored on completion.

Strict 1:1 byte parity:
Both functions exposed their dependencies through proxy patterns
established in earlier lifts (PR4–PR8). For prepare_stream_task,
`stream_state` is a deps property; for the explorer, Flask `request` /
`jsonify` / `Response` are injected via deps so the lifted body keeps
its native syntax. Both lifts verified ZERO diff against the original
after `deps.X` → global X normalization.

258 lines orig = 258 lines lifted (prepare_stream_task).
305 lines orig = 305 lines lifted (explorer).

Bonus cleanup: web_server.py's module-level `import shutil` and
`import glob` were now unused (only `_prepare_stream_task` used them
at module scope; every other reference is via inline `import shutil`
in respective function bodies). Removed both module-level imports —
ruff caught the F811 redefinitions and confirmed they're truly
redundant.

Dependencies for `PrepareStreamDeps` (11 fields):
config_manager, soulseek_client, stream_lock, project_root,
docker_resolve_path, find_streaming_download_in_all_downloads,
find_downloaded_file, extract_filename, cleanup_empty_directories,
plus 2 stream_state property delegates.

Dependencies for `PlaylistExplorerDeps` (9 fields):
Flask request/Response/jsonify, spotify_client, get_database,
get_active_discovery_source, get_metadata_fallback_client,
get_metadata_fallback_source, get_metadata_cache.

Tests: 6 new under tests/streaming/test_prepare.py (state init,
Stream/ folder creation + clearing, download-init failure, completed
+ moved + ready state, partial-bytes incomplete-warning path) plus 9
new under tests/playlists/test_explorer.py (5 validation early-exit
paths, streaming response shape with meta/complete lines, mark-
explored side effect, discovered-artist grouping using matched_data,
provider mismatch falling back to raw artist name).

Full suite: 1355 passing (was 1340). Ruff clean.

End of the web_server.py extraction effort. Started at ~45,000 lines
across PR4–PR8 + this commit; finished around 35,000 lines with the
heavy worker + route logic now living in domain-cohesive packages
under core/. The remaining bulk in web_server.py is route handlers,
service initialization, and the deferred 1530-line
`_register_automation_handlers` (startup-only, marginal lift value).
pull/424/head
Broque Thomas 4 weeks ago
parent c89f7cc22e
commit 5c8b8b271a

@ -0,0 +1,363 @@
"""Playlist explorer build-tree route.
`playlist_explorer_build_tree(deps)` is the body of the
`POST /api/playlist-explorer/build-tree` route. It builds a discovery
tree from a mirrored playlist and streams the result as NDJSON
(one JSON object per artist line + a final 'complete' line).
Works with Spotify (preferred), iTunes, or Deezer as the metadata
source. Uses and populates the metadata cache to avoid redundant API
calls per discography fetch.
Two operating modes:
- `albums`: only show releases that overlap with the playlist's tracks.
- `discographies`: show the full discography of every artist in the
playlist, with `in_playlist` flag on the matching releases.
Per-artist flow inside the streaming generator:
1. Resolve discography via `_fetch_artist_discography` (cache fall
through to live API search).
2. Tag each release with `in_playlist` based on title-similarity match
against the playlist's track/album names.
3. Apply mode filter, sort by in-playlist-first then year DESC.
4. Yield one JSON line per artist.
The route returns Flask's streaming `Response` wrapper around the NDJSON
generator. Early-exit cases (bad request, playlist not found, top-level
exception) yield via Flask's standard `jsonify(...), status` shape.
Lifted verbatim from web_server.py. Wide dependency surface (Flask
`request` + `Response`, Spotify client, multiple metadata helpers,
DB access, metadata cache) all injected via `PlaylistExplorerDeps`.
"""
from __future__ import annotations
import json
import logging
import time
from dataclasses import dataclass
from typing import Any, Callable
logger = logging.getLogger(__name__)
@dataclass
class PlaylistExplorerDeps:
"""Bundle of cross-cutting deps the playlist explorer needs."""
request: Any # flask.request proxy
flask_response: Any # flask.Response constructor
flask_jsonify: Any # flask.jsonify
spotify_client: Any
get_database: Callable[[], Any]
get_active_discovery_source: Callable[[], str]
get_metadata_fallback_client: Callable[[], Any]
get_metadata_fallback_source: Callable[[], str]
get_metadata_cache: Callable[[], Any]
def playlist_explorer_build_tree(deps: PlaylistExplorerDeps):
"""Build a discovery tree from a mirrored playlist.
Streams NDJSON: one line per artist with their albums.
Works with Spotify, iTunes, or Deezer as the metadata source.
Uses and populates the metadata cache to avoid redundant API calls."""
try:
data = deps.request.get_json()
if not data:
return deps.flask_jsonify({"success": False, "error": "No data provided"}), 400
playlist_id = data.get('playlist_id')
mode = data.get('mode', 'albums') # 'albums' or 'discographies'
if not playlist_id:
return deps.flask_jsonify({"success": False, "error": "playlist_id is required"}), 400
if mode not in ('albums', 'discographies'):
return deps.flask_jsonify({"success": False, "error": "mode must be 'albums' or 'discographies'"}), 400
database = deps.get_database()
playlist = database.get_mirrored_playlist(playlist_id)
if not playlist:
return deps.flask_jsonify({"success": False, "error": "Playlist not found"}), 404
tracks = database.get_mirrored_playlist_tracks(playlist_id)
if not tracks:
return deps.flask_jsonify({"success": False, "error": "Playlist has no tracks"}), 400
# Determine active metadata source — respect user's configured primary
source_name = deps.get_active_discovery_source()
if source_name == 'spotify' and deps.spotify_client and deps.spotify_client.is_spotify_authenticated():
active_client = deps.spotify_client
else:
active_client = deps.get_metadata_fallback_client()
source_name = deps.get_metadata_fallback_source()
cache = deps.get_metadata_cache()
# Parse extra_data and group tracks by artist using discovered data
artist_groups = {}
for t in tracks:
extra = {}
if t.get('extra_data'):
try:
extra = json.loads(t['extra_data']) if isinstance(t['extra_data'], str) else t['extra_data']
except (json.JSONDecodeError, TypeError):
pass
# Only use discovery data if it matches the active metadata source
is_discovered = extra.get('discovered', False)
provider = (extra.get('provider') or '').lower()
source_matches = provider == source_name or (provider in ('itunes', 'apple') and source_name == 'itunes')
matched = extra.get('matched_data', {}) if (is_discovered and source_matches) else {}
artists_list = matched.get('artists', [])
primary_artist = artists_list[0] if artists_list else None
# Artists can be dicts {"name": "X", "id": "Y"} or plain strings "X"
if isinstance(primary_artist, dict):
artist_name = primary_artist.get('name') or (t.get('artist_name') or '').strip()
artist_id = primary_artist.get('id') or None
elif isinstance(primary_artist, str):
artist_name = primary_artist or (t.get('artist_name') or '').strip()
artist_id = None
else:
artist_name = (t.get('artist_name') or '').strip()
artist_id = None
if not artist_name:
continue
key = artist_name.lower()
if key not in artist_groups:
artist_groups[key] = {
'name': artist_name,
'artist_id': artist_id, # Pre-resolved from discovery
'tracks': [],
'album_names': set(),
'discovered': extra.get('discovered', False),
}
# If we get an artist_id from a later track but didn't have one before, fill it in
if artist_id and not artist_groups[key].get('artist_id'):
artist_groups[key]['artist_id'] = artist_id
artist_groups[key]['tracks'].append(t.get('track_name', ''))
# Get album name from discovered data or playlist field
album_name = ''
album_data = matched.get('album')
if isinstance(album_data, dict) and album_data.get('name'):
album_name = album_data['name']
elif (t.get('album_name') or '').strip():
album_name = t['album_name'].strip()
if album_name:
artist_groups[key]['album_names'].add(album_name)
def _normalize_for_match(title):
import re
return re.sub(r'\s*[\(\[][^)\]]*[\)\]]', '', title).strip().lower()
def _fetch_artist_discography(artist_name, known_artist_id=None):
"""Fetch discography using the active client. Checks cache first, stores results after.
If known_artist_id is provided (from discovery cache), skips the name search."""
# Check cache for this artist's discography
cache_key = f"explorer_disco_{artist_name.lower().strip()}"
cached = cache.get_entity(source_name, 'artist_discography', cache_key) if cache else None
if cached and isinstance(cached, dict) and cached.get('albums'):
logger.debug(f"Explorer: cache hit for '{artist_name}' discography")
return cached
artist_id = known_artist_id
artist_image = None
if artist_id:
# Already have the ID from discovery — just fetch the artist image
try:
artist_info = active_client.get_artist(artist_id)
if artist_info:
if isinstance(artist_info, dict):
images = artist_info.get('images') or []
artist_image = images[0].get('url') if images else None
elif hasattr(artist_info, 'image_url'):
artist_image = artist_info.image_url
except Exception:
pass
else:
# No pre-resolved ID — search by name
try:
search_results = active_client.search_artists(artist_name, limit=5)
except Exception as e:
return {'success': False, 'error': f'Search failed: {e}'}
if not search_results:
return {'success': False, 'error': f'"{artist_name}" not found'}
# Find best match (exact first, then fuzzy)
best = None
for a in search_results:
if a.name.lower().strip() == artist_name.lower().strip():
best = a
break
if not best:
best = search_results[0]
artist_id = best.id
artist_image = best.image_url if hasattr(best, 'image_url') else None
# Fetch albums
try:
# skip_cache only supported by spotify_client — other clients don't cache this call
_skip = {'skip_cache': True} if hasattr(active_client, 'sp') else {}
all_albums = active_client.get_artist_albums(artist_id, album_type='album,single', **_skip)
except Exception as e:
return {'success': False, 'error': f'Album fetch failed: {e}'}
if not all_albums:
return {'success': False, 'error': 'No albums found'}
# Check which albums the user already owns
owned_titles = set()
try:
db = deps.get_database()
with db._get_connection() as conn:
cursor = conn.cursor()
# Find all artists in DB matching this name
cursor.execute("SELECT id FROM artists WHERE LOWER(name) = LOWER(?)", (artist_name,))
artist_rows = cursor.fetchall()
for ar in artist_rows:
cursor.execute("SELECT title FROM albums WHERE artist_id = ?", (ar['id'],))
for alb_row in cursor.fetchall():
owned_titles.add((alb_row['title'] or '').strip().lower())
except Exception:
pass # Non-critical — owned badges just won't show
# Build release list
releases = []
for album in all_albums:
# Skip albums where this artist isn't primary
if hasattr(album, 'artist_ids') and album.artist_ids and album.artist_ids[0] != artist_id:
continue
releases.append({
'title': album.name,
'year': album.release_date[:4] if album.release_date else None,
'image_url': album.image_url,
'spotify_id': album.id,
'track_count': album.total_tracks,
'album_type': (album.album_type or 'album').lower(),
'owned': (album.name or '').strip().lower() in owned_titles,
})
result = {
'success': True,
'name': artist_name, # Required for metadata cache validation
'albums': releases,
'artist_image': artist_image,
'artist_id': artist_id,
'artist_name': artist_name,
}
# Store in cache
if cache and releases:
try:
cache.store_entity(source_name, 'artist_discography', cache_key, result)
except Exception:
pass
return result
def generate():
yield json.dumps({
"type": "meta",
"playlist_name": playlist.get('name', 'Unknown Playlist'),
"playlist_image": playlist.get('image_url', ''),
"total_artists": len(artist_groups),
"total_tracks": len(tracks),
"source": source_name,
}) + '\n'
total_albums = 0
for idx, (_key, group) in enumerate(artist_groups.items()):
artist_name = group['name']
playlist_track_names = group['tracks']
playlist_album_names = group['album_names']
try:
disco = _fetch_artist_discography(artist_name, group.get('artist_id'))
if not disco.get('success'):
yield json.dumps({
"type": "artist",
"name": artist_name,
"artist_id": None,
"image_url": None,
"playlist_tracks": playlist_track_names,
"albums": [],
"error": disco.get('error', 'Not found'),
}) + '\n'
time.sleep(0.1)
continue
# Tag each release with in_playlist flag
# If no album names available, fall back to matching track names against single titles
match_names = playlist_album_names
if not match_names:
match_names = set(playlist_track_names)
all_releases = []
for release in disco.get('albums', []):
r = dict(release)
norm_title = _normalize_for_match(r['title'])
r['in_playlist'] = any(
_normalize_for_match(a) == norm_title or
norm_title in _normalize_for_match(a) or
_normalize_for_match(a) in norm_title
for a in match_names
)
all_releases.append(r)
# Filter based on mode
if mode == 'albums':
filtered = [r for r in all_releases if r['in_playlist']]
else:
filtered = all_releases
filtered.sort(key=lambda r: (not r.get('in_playlist', False), -(int(r.get('year') or 0))))
total_albums += len(filtered)
yield json.dumps({
"type": "artist",
"name": disco.get('artist_name', artist_name),
"artist_id": disco.get('artist_id'),
"image_url": disco.get('artist_image'),
"playlist_tracks": playlist_track_names,
"albums": filtered,
}) + '\n'
except Exception as e:
logger.error(f"Explorer: error processing artist '{artist_name}': {e}")
yield json.dumps({
"type": "artist",
"name": artist_name,
"artist_id": None,
"image_url": None,
"playlist_tracks": playlist_track_names,
"albums": [],
"error": str(e),
}) + '\n'
# Rate limit protection between artists
if idx < len(artist_groups) - 1:
time.sleep(0.2)
deps.get_database().mark_mirrored_playlist_explored(playlist_id)
yield json.dumps({"type": "complete", "total_artists": len(artist_groups), "total_albums": total_albums}) + '\n'
return deps.flask_response(generate(), mimetype='application/x-ndjson', headers={
'Cache-Control': 'no-cache',
'X-Accel-Buffering': 'no',
})
except Exception as e:
logger.error(f"Playlist Explorer build-tree error: {e}")
import traceback
traceback.print_exc()
return deps.flask_jsonify({"success": False, "error": str(e)}), 500

@ -0,0 +1,326 @@
"""Streaming preparation worker.
`prepare_stream_task(track_data, deps)` is the function the stream
executor submits to fetch a track from Soulseek/YouTube/etc and stage
it in the local Stream/ folder for the browser audio player.
1. Reset stream state to 'loading' with the new track info.
2. Clear any prior file from the Stream/ folder (only one stream lives
there at a time).
3. Spin up a fresh asyncio event loop and `soulseek_client.download()`
the track.
4. Poll `soulseek_client.get_all_downloads()` every 1.5 s to track
progress, with separate handling for queued vs actively downloading
states. Queue timeout = 15 s; overall timeout = 60 s.
5. On completion (state ~ 'succeeded' or progress >= 100% AND bytes
transferred match expected size), find the downloaded file with retry
logic, move it into Stream/, signal completion to the slskd API, and
mark stream_state as 'ready' with the file path.
6. On any error/timeout/cancel: stream_state goes to 'error' or
'stopped' with an explanatory message.
7. Finally: tear down the event loop cleanly.
The original mutated `stream_state` as a module global. Here it's
exposed through the `PrepareStreamDeps` proxy as a Python property so
the lifted body keeps the same `name[key] = value` syntax. The setter
fires only if the function reassigns (currently it only mutates in
place via .update() and key assignment).
"""
from __future__ import annotations
import asyncio
import glob
import logging
import os
import shutil
import time
from dataclasses import dataclass
from typing import Any, Callable
logger = logging.getLogger(__name__)
@dataclass
class PrepareStreamDeps:
"""Bundle of cross-cutting deps the stream-prep worker needs."""
config_manager: Any
soulseek_client: Any
stream_lock: Any # threading.Lock
project_root: str # absolute path to web_server.py's directory
docker_resolve_path: Callable[[str], str]
find_streaming_download_in_all_downloads: Callable
find_downloaded_file: Callable
extract_filename: Callable[[str], str]
cleanup_empty_directories: Callable
_get_stream_state: Callable[[], dict]
_set_stream_state: Callable[[dict], None]
@property
def stream_state(self) -> dict:
return self._get_stream_state()
@stream_state.setter
def stream_state(self, value: dict) -> None:
self._set_stream_state(value)
def prepare_stream_task(track_data, deps: PrepareStreamDeps):
"""
Background streaming task that downloads track to Stream folder and updates global state.
Enhanced version with robust error handling matching the GUI StreamingThread.
"""
loop = None
queue_start_time = None
actively_downloading = False
last_progress_sent = 0.0
try:
logger.info(f"Starting stream preparation for: {track_data.get('filename')}")
# Update state to loading
with deps.stream_lock:
deps.stream_state.update({
"status": "loading",
"progress": 0,
"track_info": track_data,
"file_path": None,
"error_message": None
})
# Get paths
download_path = deps.docker_resolve_path(deps.config_manager.get('soulseek.download_path', './downloads'))
project_root = deps.project_root
stream_folder = os.path.join(project_root, 'Stream')
# Ensure Stream directory exists
os.makedirs(stream_folder, exist_ok=True)
# Clear any existing files in Stream folder (only one file at a time)
for existing_file in glob.glob(os.path.join(stream_folder, '*')):
try:
if os.path.isfile(existing_file):
os.remove(existing_file)
elif os.path.isdir(existing_file):
shutil.rmtree(existing_file)
logger.info(f"Cleared old stream file: {existing_file}")
except Exception as e:
logger.error(f"Could not remove existing stream file: {e}")
# Start the download using the same mechanism as regular downloads
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
download_result = loop.run_until_complete(deps.soulseek_client.download(
track_data.get('username'),
track_data.get('filename'),
track_data.get('size', 0)
))
if not download_result:
with deps.stream_lock:
deps.stream_state.update({
"status": "error",
"error_message": "Failed to initiate download - uploader may be offline"
})
return
logger.info("Download initiated for streaming")
# Enhanced monitoring with queue timeout detection (matching GUI)
max_wait_time = 60 # Increased timeout
poll_interval = 1.5 # More frequent polling
queue_timeout = 15 # Queue timeout like GUI
wait_count = 0
while wait_count * poll_interval < max_wait_time:
wait_count += 1
# Check download progress via orchestrator (works for Soulseek and YouTube)
api_progress = None
download_state = None
download_status = None
try:
# Use orchestrator's get_all_downloads() which works for both sources
all_downloads = loop.run_until_complete(deps.soulseek_client.get_all_downloads())
download_status = deps.find_streaming_download_in_all_downloads(all_downloads, track_data)
if download_status:
api_progress = download_status.get('percentComplete', 0)
download_state = download_status.get('state', '').lower()
original_state = download_status.get('state', '')
logger.info(f"API Download - State: {original_state}, Progress: {api_progress:.1f}%")
# Track queue state timing (matching GUI logic)
is_queued = ('queued' in download_state or 'initializing' in download_state)
is_downloading = ('inprogress' in download_state or 'transferring' in download_state)
# Verify bytes match before trusting state/progress
_stream_expected = download_status.get('size', 0)
_stream_transferred = download_status.get('bytesTransferred', 0)
_bytes_ok = _stream_expected <= 0 or _stream_transferred >= _stream_expected
is_completed = ('succeeded' in download_state or api_progress >= 100) and _bytes_ok
# Handle queue state timing
if is_queued and queue_start_time is None:
queue_start_time = time.time()
logger.info(f"Download entered queue state: {original_state}")
with deps.stream_lock:
deps.stream_state["status"] = "queued"
elif is_downloading and not actively_downloading:
actively_downloading = True
queue_start_time = None # Reset queue timer
logger.info(f"Download started actively downloading: {original_state}")
with deps.stream_lock:
deps.stream_state["status"] = "loading"
# Check for queue timeout (matching GUI)
if is_queued and queue_start_time:
queue_elapsed = time.time() - queue_start_time
if queue_elapsed > queue_timeout:
logger.error(f"⏰ Queue timeout after {queue_elapsed:.1f}s - download stuck in queue")
with deps.stream_lock:
deps.stream_state.update({
"status": "error",
"error_message": "Queue timeout - uploader not responding. Try another source."
})
return
# Update progress
with deps.stream_lock:
if api_progress != last_progress_sent:
deps.stream_state["progress"] = api_progress
last_progress_sent = api_progress
# Check if download is complete
if is_completed:
logger.info(f"Download completed via API status: {original_state}")
# Wait for file to stabilise on disk before moving
found_file = deps.find_downloaded_file(download_path, track_data)
if found_file:
_prev_sz = -1
for _sc in range(4):
try:
_cur_sz = os.path.getsize(found_file)
except OSError:
_cur_sz = -1
if _cur_sz == _prev_sz and _cur_sz > 0:
break
_prev_sz = _cur_sz
time.sleep(1.5)
# Re-find in case it wasn't found on first try
if not found_file:
found_file = deps.find_downloaded_file(download_path, track_data)
# Retry file search a few times (matching GUI logic)
retry_attempts = 5
for attempt in range(retry_attempts):
if found_file:
break
logger.warning(f"File not found yet, attempt {attempt + 1}/{retry_attempts}")
time.sleep(1)
found_file = deps.find_downloaded_file(download_path, track_data)
if found_file:
logger.debug(f"Found downloaded file: {found_file}")
# Move file to Stream folder
original_filename = deps.extract_filename(found_file)
stream_path = os.path.join(stream_folder, original_filename)
try:
shutil.move(found_file, stream_path)
logger.debug(f"Moved file to stream folder: {stream_path}")
# Clean up empty directories (matching GUI)
deps.cleanup_empty_directories(download_path, found_file)
# Update state to ready
with deps.stream_lock:
deps.stream_state.update({
"status": "ready",
"progress": 100,
"file_path": stream_path
})
# Clean up download from slskd API
try:
download_id = download_status.get('id', '')
if download_id and track_data.get('username'):
success = loop.run_until_complete(
deps.soulseek_client.signal_download_completion(
download_id, track_data.get('username'), remove=True)
)
if success:
logger.debug(f"Cleaned up download {download_id} from API")
except Exception as e:
logger.error(f"Error cleaning up download: {e}")
logger.info(f"Stream file ready for playback: {stream_path}")
return # Success!
except Exception as e:
logger.error(f"Error moving file to stream folder: {e}")
with deps.stream_lock:
deps.stream_state.update({
"status": "error",
"error_message": f"Failed to prepare stream file: {e}"
})
return
else:
logger.error("Could not find downloaded file after completion")
with deps.stream_lock:
deps.stream_state.update({
"status": "error",
"error_message": "Download completed but file not found"
})
return
else:
# No transfer found in API - may still be initializing
logger.debug(f"No transfer found in API yet... (elapsed: {wait_count * poll_interval}s)")
except Exception as e:
logger.error(f"Error checking download progress: {e}")
# Continue to next iteration if API call fails
# Wait before next poll
time.sleep(poll_interval)
# If we get here, download timed out
logger.warning(f"Download timed out after {max_wait_time}s")
with deps.stream_lock:
deps.stream_state.update({
"status": "error",
"error_message": "Download timed out - try a different source"
})
except asyncio.CancelledError:
logger.warning("Stream task cancelled")
with deps.stream_lock:
deps.stream_state.update({
"status": "stopped",
"error_message": None
})
finally:
if loop:
try:
# Clean up any pending tasks
pending = asyncio.all_tasks(loop)
if pending:
loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
loop.close()
except Exception as e:
logger.error(f"Error cleaning up streaming event loop: {e}")
except Exception as e:
logger.error(f"Stream preparation failed: {e}")
with deps.stream_lock:
deps.stream_state.update({
"status": "error",
"error_message": f"Streaming error: {str(e)}"
})

@ -0,0 +1,327 @@
"""Tests for core/playlists/explorer.py — playlist explorer build-tree route."""
from __future__ import annotations
import json
from dataclasses import dataclass
import pytest
from core.playlists import explorer as ex
# ---------------------------------------------------------------------------
# Fakes
# ---------------------------------------------------------------------------
class _FakeRequest:
def __init__(self, payload):
self._payload = payload
def get_json(self):
return self._payload
class _FakeResponse:
"""Captures the streaming generator + headers."""
def __init__(self, generator, mimetype=None, headers=None):
self.generator = generator
self.mimetype = mimetype
self.headers = headers or {}
self.body_lines = list(generator)
def _fake_jsonify(payload):
"""Returns the payload dict as-is — the wrapper does the actual jsonify."""
return payload
@dataclass
class _FakeAlbum:
id: str
name: str
release_date: str = '2024-01-01'
image_url: str = ''
total_tracks: int = 10
album_type: str = 'album'
artist_ids: list = None
class _FakeArtistMeta:
def __init__(self, name='Artist', id='a-1'):
self.name = name
self.id = id
self.image_url = 'http://art-img'
class _FakeSpotify:
def __init__(self, *, authenticated=True, search_results=None, albums=None):
self._authenticated = authenticated
self._search_results = search_results or []
self._albums = albums or []
self.sp = object() # pretends to be spotify (skip_cache support)
def is_spotify_authenticated(self):
return self._authenticated
def search_artists(self, name, limit=5):
return self._search_results
def get_artist_albums(self, artist_id, album_type='album,single', skip_cache=False):
return self._albums
def get_artist(self, artist_id):
return None
class _FakeCursor:
def __init__(self):
self.queries = []
def execute(self, sql, params=None):
self.queries.append((sql, params))
def fetchall(self):
return []
class _FakeConn:
def __init__(self):
self.cur = _FakeCursor()
def cursor(self):
return self.cur
def __enter__(self):
return self
def __exit__(self, *args):
return False
class _FakeDB:
def __init__(self, playlist=None, tracks=None):
self._playlist = playlist
self._tracks = tracks or []
self.marked_explored = False
def get_mirrored_playlist(self, pid):
return self._playlist
def get_mirrored_playlist_tracks(self, pid):
return self._tracks
def mark_mirrored_playlist_explored(self, pid):
self.marked_explored = True
def _get_connection(self):
return _FakeConn()
class _FakeCache:
def __init__(self):
self.entries = {}
def get_entity(self, source, kind, key):
return self.entries.get((source, kind, key))
def store_entity(self, source, kind, key, value):
self.entries[(source, kind, key)] = value
def _build_deps(
*,
request_payload=None,
spotify=None,
db=None,
discovery_source='spotify',
fallback_source='itunes',
cache=None,
):
deps = ex.PlaylistExplorerDeps(
request=_FakeRequest(request_payload or {}),
flask_response=_FakeResponse,
flask_jsonify=_fake_jsonify,
spotify_client=spotify or _FakeSpotify(),
get_database=lambda: db or _FakeDB(),
get_active_discovery_source=lambda: discovery_source,
get_metadata_fallback_client=lambda: spotify or _FakeSpotify(),
get_metadata_fallback_source=lambda: fallback_source,
get_metadata_cache=lambda: cache or _FakeCache(),
)
return deps
# ---------------------------------------------------------------------------
# Validation early exits
# ---------------------------------------------------------------------------
def test_no_data_returns_400():
"""Empty/None request body → 400."""
deps = _build_deps(request_payload=None)
deps.request = _FakeRequest(None)
result = ex.playlist_explorer_build_tree(deps)
payload, status = result
assert status == 400
assert payload == {"success": False, "error": "No data provided"}
def test_missing_playlist_id_returns_400():
"""Payload without playlist_id → 400."""
deps = _build_deps(request_payload={'mode': 'albums'})
payload, status = ex.playlist_explorer_build_tree(deps)
assert status == 400
assert 'playlist_id' in payload['error']
def test_invalid_mode_returns_400():
"""mode != 'albums'/'discographies' → 400."""
deps = _build_deps(request_payload={'playlist_id': '1', 'mode': 'invalid'})
payload, status = ex.playlist_explorer_build_tree(deps)
assert status == 400
assert "'albums' or 'discographies'" in payload['error']
def test_playlist_not_found_returns_404():
"""Database returns no playlist for the given ID → 404."""
db = _FakeDB(playlist=None)
deps = _build_deps(request_payload={'playlist_id': '99'}, db=db)
payload, status = ex.playlist_explorer_build_tree(deps)
assert status == 404
def test_playlist_with_no_tracks_returns_400():
"""Playlist found but has no tracks → 400."""
db = _FakeDB(playlist={'name': 'P', 'image_url': ''}, tracks=[])
deps = _build_deps(request_payload={'playlist_id': '1'}, db=db)
payload, status = ex.playlist_explorer_build_tree(deps)
assert status == 400
# ---------------------------------------------------------------------------
# Streaming response
# ---------------------------------------------------------------------------
def test_success_returns_streaming_response_with_meta_line():
"""Successful build → Response wrapper with NDJSON generator that starts with 'meta'."""
db = _FakeDB(
playlist={'name': 'My Playlist', 'image_url': 'http://img'},
tracks=[{'track_name': 'T1', 'artist_name': 'Artist One', 'album_name': 'Album X', 'extra_data': None}],
)
spotify = _FakeSpotify(
search_results=[_FakeArtistMeta(name='Artist One', id='a-1')],
albums=[_FakeAlbum(id='alb-1', name='Album X', release_date='2024')],
)
deps = _build_deps(
request_payload={'playlist_id': '1', 'mode': 'discographies'},
spotify=spotify,
db=db,
)
response = ex.playlist_explorer_build_tree(deps)
# _FakeResponse exposes body_lines pre-collected from the generator
assert response.mimetype == 'application/x-ndjson'
lines = response.body_lines
assert len(lines) >= 2
# First line should be meta
first = json.loads(lines[0])
assert first['type'] == 'meta'
assert first['playlist_name'] == 'My Playlist'
assert first['source'] == 'spotify'
# Last line should be 'complete'
last = json.loads(lines[-1])
assert last['type'] == 'complete'
def test_marks_playlist_explored_at_end_of_stream():
"""When the streaming generator runs to completion, mark_mirrored_playlist_explored fires."""
db = _FakeDB(
playlist={'name': 'P', 'image_url': ''},
tracks=[{'track_name': 'T', 'artist_name': 'A', 'album_name': 'B', 'extra_data': None}],
)
spotify = _FakeSpotify(search_results=[_FakeArtistMeta(name='A')])
deps = _build_deps(
request_payload={'playlist_id': '1', 'mode': 'discographies'},
spotify=spotify,
db=db,
)
ex.playlist_explorer_build_tree(deps)
assert db.marked_explored is True
# ---------------------------------------------------------------------------
# Discovered-track grouping
# ---------------------------------------------------------------------------
def test_discovered_artist_grouping_uses_matched_data():
"""Tracks with matching-source extra_data → use matched_data['artists'][0]."""
db = _FakeDB(
playlist={'name': 'P', 'image_url': ''},
tracks=[
{
'track_name': 'T',
'artist_name': 'Local Artist Name', # raw
'album_name': 'Local Album',
'extra_data': json.dumps({
'discovered': True,
'provider': 'spotify',
'matched_data': {
'artists': [{'name': 'Discovered Artist', 'id': 'sp-aid'}],
'album': {'name': 'Discovered Album'},
},
}),
},
],
)
spotify = _FakeSpotify(
search_results=[_FakeArtistMeta(name='Discovered Artist')],
albums=[_FakeAlbum(id='alb-1', name='Discovered Album', release_date='2024')],
)
deps = _build_deps(
request_payload={'playlist_id': '1', 'mode': 'discographies'},
spotify=spotify, db=db,
)
response = ex.playlist_explorer_build_tree(deps)
artist_lines = [json.loads(line) for line in response.body_lines if json.loads(line).get('type') == 'artist']
assert len(artist_lines) == 1
assert artist_lines[0]['name'] == 'Discovered Artist'
assert artist_lines[0]['artist_id'] == 'sp-aid'
def test_provider_mismatch_falls_back_to_raw_track_name():
"""If discovered provider != active source, ignore matched_data, use raw artist_name."""
db = _FakeDB(
playlist={'name': 'P', 'image_url': ''},
tracks=[
{
'track_name': 'T',
'artist_name': 'Raw Artist',
'album_name': 'Raw Album',
'extra_data': json.dumps({
'discovered': True,
'provider': 'itunes', # mismatch
'matched_data': {
'artists': [{'name': 'iTunes Artist'}],
},
}),
},
],
)
# Active source is spotify (default)
spotify = _FakeSpotify(search_results=[_FakeArtistMeta(name='Raw Artist')])
deps = _build_deps(
request_payload={'playlist_id': '1', 'mode': 'discographies'},
spotify=spotify, db=db,
)
response = ex.playlist_explorer_build_tree(deps)
artist_lines = [json.loads(line) for line in response.body_lines if json.loads(line).get('type') == 'artist']
assert artist_lines[0]['name'] == 'Raw Artist' # NOT 'iTunes Artist'

@ -0,0 +1,174 @@
"""Tests for core/streaming/prepare.py — stream-prep worker."""
from __future__ import annotations
import threading
import pytest
from core.streaming import prepare as sp
class _FakeSoulseek:
"""Minimal soulseek_client stub for the stream-prep worker."""
def __init__(self, *, download_id='dl-1', all_downloads=None):
self._download_id = download_id
self._all_downloads = all_downloads if all_downloads is not None else []
async def download(self, username, filename, size):
return self._download_id
async def get_all_downloads(self):
return self._all_downloads
async def signal_download_completion(self, download_id, username, remove=True):
return True
def _build_deps(
*,
state=None,
soulseek=None,
project_root='/tmp/proj',
find_streaming_result=None,
find_downloaded_result=None,
):
state = state if state is not None else {}
deps = sp.PrepareStreamDeps(
config_manager=type('C', (), {'get': lambda self, k, d=None: d})(),
soulseek_client=soulseek or _FakeSoulseek(),
stream_lock=threading.Lock(),
project_root=project_root,
docker_resolve_path=lambda p: p,
find_streaming_download_in_all_downloads=lambda all_dl, td: find_streaming_result,
find_downloaded_file=lambda dl_path, td: find_downloaded_result,
extract_filename=lambda fp: __import__('os').path.basename(fp),
cleanup_empty_directories=lambda dl_path, found_file: None,
_get_stream_state=lambda: state,
_set_stream_state=lambda v: state.clear() or state.update(v),
)
deps._state = state
return deps
# ---------------------------------------------------------------------------
# Initial state setup
# ---------------------------------------------------------------------------
def test_state_starts_loading_with_track_info(tmp_path):
"""First action sets state to 'loading' with the track_info."""
sk = _FakeSoulseek(download_id=None) # forces an early "Failed to initiate" exit
deps = _build_deps(soulseek=sk, project_root=str(tmp_path))
track_data = {'username': 'u', 'filename': 'song.flac', 'size': 1000}
sp.prepare_stream_task(track_data, deps)
# First mutation set status='loading', track_info=track_data
# Then early exit because download() returned None — state ends up 'error'
assert deps._state['status'] == 'error'
assert 'Failed to initiate' in deps._state['error_message']
def test_stream_folder_created(tmp_path):
"""Stream/ subfolder is created under project_root."""
sk = _FakeSoulseek(download_id=None)
deps = _build_deps(soulseek=sk, project_root=str(tmp_path))
sp.prepare_stream_task({'username': 'u', 'filename': 'x', 'size': 0}, deps)
assert (tmp_path / 'Stream').is_dir()
def test_stream_folder_cleared_before_download(tmp_path):
"""Existing files in Stream/ are removed before each prepare."""
stream_dir = tmp_path / 'Stream'
stream_dir.mkdir()
old_file = stream_dir / 'old.flac'
old_file.write_bytes(b'old data')
assert old_file.exists()
sk = _FakeSoulseek(download_id=None)
deps = _build_deps(soulseek=sk, project_root=str(tmp_path))
sp.prepare_stream_task({'username': 'u', 'filename': 'x', 'size': 0}, deps)
# Old file gone (cleared at start of prep)
assert not old_file.exists()
# ---------------------------------------------------------------------------
# Download initiation failure
# ---------------------------------------------------------------------------
def test_download_returns_none_marks_error(tmp_path):
"""soulseek_client.download() returning None → state.error."""
sk = _FakeSoulseek(download_id=None)
deps = _build_deps(soulseek=sk, project_root=str(tmp_path))
sp.prepare_stream_task({'username': 'u', 'filename': 'x', 'size': 0}, deps)
assert deps._state['status'] == 'error'
# ---------------------------------------------------------------------------
# Successful completion
# ---------------------------------------------------------------------------
def test_completed_download_moves_to_stream_and_marks_ready(tmp_path):
"""When the polled status reports succeeded + bytes match, file moved + state ready."""
download_path = tmp_path / 'downloads'
download_path.mkdir()
src_file = download_path / 'song.flac'
src_file.write_bytes(b'audio')
download_status = {
'id': 'dl-99',
'state': 'Succeeded',
'percentComplete': 100,
'size': 5,
'bytesTransferred': 5,
}
sk = _FakeSoulseek(download_id='dl-99', all_downloads=['stub'])
deps = _build_deps(
soulseek=sk,
project_root=str(tmp_path),
find_streaming_result=download_status,
find_downloaded_result=str(src_file),
)
deps.config_manager = type('C', (), {
'get': lambda self, k, d=None: str(download_path) if k == 'soulseek.download_path' else d,
})()
sp.prepare_stream_task(
{'username': 'u', 'filename': 'song.flac', 'size': 5},
deps,
)
assert deps._state['status'] == 'ready'
assert deps._state['progress'] == 100
assert (tmp_path / 'Stream' / 'song.flac').exists()
assert deps._state['file_path'] == str(tmp_path / 'Stream' / 'song.flac')
def test_succeeded_state_with_partial_bytes_keeps_polling(tmp_path):
"""If state is 'Succeeded' but bytes < size, marks _incomplete_warned and continues."""
download_status = {
'id': 'dl-99',
'state': 'Succeeded',
'percentComplete': 100,
'size': 100,
'bytesTransferred': 50, # incomplete
}
sk = _FakeSoulseek(download_id='dl-99', all_downloads=['stub'])
deps = _build_deps(
soulseek=sk,
project_root=str(tmp_path),
find_streaming_result=download_status,
)
# Force quick exit by capping the loop with no further state change
# Worker times out via max_wait_time in real code — we just verify state didn't go ready
sp.prepare_stream_task({'username': 'u', 'filename': 'x', 'size': 100}, deps)
# Should NOT have gone to 'ready' because bytes were incomplete
assert deps._state['status'] != 'ready'

@ -10,8 +10,6 @@ import subprocess
import platform
import threading
import time
import shutil
import glob
import uuid
import re
import sqlite3
@ -3780,264 +3778,37 @@ _EDITION_BARE_RE = _re.compile(
_re.IGNORECASE
)
def _prepare_stream_task(track_data):
"""
Background streaming task that downloads track to Stream folder and updates global state.
Enhanced version with robust error handling matching the GUI StreamingThread.
"""
loop = None
queue_start_time = None
actively_downloading = False
last_progress_sent = 0.0
try:
logger.info(f"Starting stream preparation for: {track_data.get('filename')}")
# Update state to loading
with stream_lock:
stream_state.update({
"status": "loading",
"progress": 0,
"track_info": track_data,
"file_path": None,
"error_message": None
})
# Get paths
download_path = docker_resolve_path(config_manager.get('soulseek.download_path', './downloads'))
project_root = os.path.dirname(os.path.abspath(__file__))
stream_folder = os.path.join(project_root, 'Stream')
# Ensure Stream directory exists
os.makedirs(stream_folder, exist_ok=True)
# Clear any existing files in Stream folder (only one file at a time)
for existing_file in glob.glob(os.path.join(stream_folder, '*')):
try:
if os.path.isfile(existing_file):
os.remove(existing_file)
elif os.path.isdir(existing_file):
shutil.rmtree(existing_file)
logger.info(f"Cleared old stream file: {existing_file}")
except Exception as e:
logger.error(f"Could not remove existing stream file: {e}")
# Start the download using the same mechanism as regular downloads
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
download_result = loop.run_until_complete(soulseek_client.download(
track_data.get('username'),
track_data.get('filename'),
track_data.get('size', 0)
))
if not download_result:
with stream_lock:
stream_state.update({
"status": "error",
"error_message": "Failed to initiate download - uploader may be offline"
})
return
logger.info("Download initiated for streaming")
# Enhanced monitoring with queue timeout detection (matching GUI)
max_wait_time = 60 # Increased timeout
poll_interval = 1.5 # More frequent polling
queue_timeout = 15 # Queue timeout like GUI
wait_count = 0
while wait_count * poll_interval < max_wait_time:
wait_count += 1
# Check download progress via orchestrator (works for Soulseek and YouTube)
api_progress = None
download_state = None
download_status = None
# Stream-prep worker logic lives in core/streaming/prepare.py.
from core.streaming import prepare as _streaming_prepare
try:
# Use orchestrator's get_all_downloads() which works for both sources
all_downloads = loop.run_until_complete(soulseek_client.get_all_downloads())
download_status = _find_streaming_download_in_all_downloads(all_downloads, track_data)
if download_status:
api_progress = download_status.get('percentComplete', 0)
download_state = download_status.get('state', '').lower()
original_state = download_status.get('state', '')
logger.info(f"API Download - State: {original_state}, Progress: {api_progress:.1f}%")
# Track queue state timing (matching GUI logic)
is_queued = ('queued' in download_state or 'initializing' in download_state)
is_downloading = ('inprogress' in download_state or 'transferring' in download_state)
# Verify bytes match before trusting state/progress
_stream_expected = download_status.get('size', 0)
_stream_transferred = download_status.get('bytesTransferred', 0)
_bytes_ok = _stream_expected <= 0 or _stream_transferred >= _stream_expected
is_completed = ('succeeded' in download_state or api_progress >= 100) and _bytes_ok
# Handle queue state timing
if is_queued and queue_start_time is None:
queue_start_time = time.time()
logger.info(f"Download entered queue state: {original_state}")
with stream_lock:
stream_state["status"] = "queued"
elif is_downloading and not actively_downloading:
actively_downloading = True
queue_start_time = None # Reset queue timer
logger.info(f"Download started actively downloading: {original_state}")
with stream_lock:
stream_state["status"] = "loading"
# Check for queue timeout (matching GUI)
if is_queued and queue_start_time:
queue_elapsed = time.time() - queue_start_time
if queue_elapsed > queue_timeout:
logger.error(f"⏰ Queue timeout after {queue_elapsed:.1f}s - download stuck in queue")
with stream_lock:
stream_state.update({
"status": "error",
"error_message": "Queue timeout - uploader not responding. Try another source."
})
return
# Update progress
with stream_lock:
if api_progress != last_progress_sent:
stream_state["progress"] = api_progress
last_progress_sent = api_progress
# Check if download is complete
if is_completed:
logger.info(f"Download completed via API status: {original_state}")
# Wait for file to stabilise on disk before moving
found_file = _find_downloaded_file(download_path, track_data)
if found_file:
_prev_sz = -1
for _sc in range(4):
try:
_cur_sz = os.path.getsize(found_file)
except OSError:
_cur_sz = -1
if _cur_sz == _prev_sz and _cur_sz > 0:
break
_prev_sz = _cur_sz
time.sleep(1.5)
# Re-find in case it wasn't found on first try
if not found_file:
found_file = _find_downloaded_file(download_path, track_data)
# Retry file search a few times (matching GUI logic)
retry_attempts = 5
for attempt in range(retry_attempts):
if found_file:
break
logger.warning(f"File not found yet, attempt {attempt + 1}/{retry_attempts}")
time.sleep(1)
found_file = _find_downloaded_file(download_path, track_data)
if found_file:
logger.debug(f"Found downloaded file: {found_file}")
# Move file to Stream folder
original_filename = extract_filename(found_file)
stream_path = os.path.join(stream_folder, original_filename)
try:
shutil.move(found_file, stream_path)
logger.debug(f"Moved file to stream folder: {stream_path}")
# Clean up empty directories (matching GUI)
_cleanup_empty_directories(download_path, found_file)
# Update state to ready
with stream_lock:
stream_state.update({
"status": "ready",
"progress": 100,
"file_path": stream_path
})
# Clean up download from slskd API
try:
download_id = download_status.get('id', '')
if download_id and track_data.get('username'):
success = loop.run_until_complete(
soulseek_client.signal_download_completion(
download_id, track_data.get('username'), remove=True)
)
if success:
logger.debug(f"Cleaned up download {download_id} from API")
except Exception as e:
logger.error(f"Error cleaning up download: {e}")
logger.info(f"Stream file ready for playback: {stream_path}")
return # Success!
except Exception as e:
logger.error(f"Error moving file to stream folder: {e}")
with stream_lock:
stream_state.update({
"status": "error",
"error_message": f"Failed to prepare stream file: {e}"
})
return
else:
logger.error("Could not find downloaded file after completion")
with stream_lock:
stream_state.update({
"status": "error",
"error_message": "Download completed but file not found"
})
return
else:
# No transfer found in API - may still be initializing
logger.debug(f"No transfer found in API yet... (elapsed: {wait_count * poll_interval}s)")
except Exception as e:
logger.error(f"Error checking download progress: {e}")
# Continue to next iteration if API call fails
# Wait before next poll
time.sleep(poll_interval)
# If we get here, download timed out
logger.warning(f"Download timed out after {max_wait_time}s")
with stream_lock:
stream_state.update({
"status": "error",
"error_message": "Download timed out - try a different source"
})
except asyncio.CancelledError:
logger.warning("Stream task cancelled")
with stream_lock:
stream_state.update({
"status": "stopped",
"error_message": None
})
finally:
if loop:
try:
# Clean up any pending tasks
pending = asyncio.all_tasks(loop)
if pending:
loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
loop.close()
except Exception as e:
logger.error(f"Error cleaning up streaming event loop: {e}")
except Exception as e:
logger.error(f"Stream preparation failed: {e}")
with stream_lock:
stream_state.update({
"status": "error",
"error_message": f"Streaming error: {str(e)}"
})
def _build_prepare_stream_deps():
"""Build the PrepareStreamDeps bundle from web_server.py globals on each call."""
def _get_stream_state():
return stream_state
def _set_stream_state(value):
global stream_state
stream_state = value
return _streaming_prepare.PrepareStreamDeps(
config_manager=config_manager,
soulseek_client=soulseek_client,
stream_lock=stream_lock,
project_root=os.path.dirname(os.path.abspath(__file__)),
docker_resolve_path=docker_resolve_path,
find_streaming_download_in_all_downloads=_find_streaming_download_in_all_downloads,
find_downloaded_file=_find_downloaded_file,
extract_filename=extract_filename,
cleanup_empty_directories=_cleanup_empty_directories,
_get_stream_state=_get_stream_state,
_set_stream_state=_set_stream_state,
)
def _prepare_stream_task(track_data):
return _streaming_prepare.prepare_stream_task(track_data, _build_prepare_stream_deps())
def _find_streaming_download_in_all_downloads(all_downloads, track_data):
"""
@ -36056,310 +35827,29 @@ def get_mirrored_discovery_states():
# PLAYLIST EXPLORER
# ================================================================================================
@app.route('/api/playlist-explorer/build-tree', methods=['POST'])
def playlist_explorer_build_tree():
"""Build a discovery tree from a mirrored playlist.
Streams NDJSON: one line per artist with their albums.
Works with Spotify, iTunes, or Deezer as the metadata source.
Uses and populates the metadata cache to avoid redundant API calls."""
try:
data = request.get_json()
if not data:
return jsonify({"success": False, "error": "No data provided"}), 400
playlist_id = data.get('playlist_id')
mode = data.get('mode', 'albums') # 'albums' or 'discographies'
if not playlist_id:
return jsonify({"success": False, "error": "playlist_id is required"}), 400
if mode not in ('albums', 'discographies'):
return jsonify({"success": False, "error": "mode must be 'albums' or 'discographies'"}), 400
database = get_database()
playlist = database.get_mirrored_playlist(playlist_id)
if not playlist:
return jsonify({"success": False, "error": "Playlist not found"}), 404
tracks = database.get_mirrored_playlist_tracks(playlist_id)
if not tracks:
return jsonify({"success": False, "error": "Playlist has no tracks"}), 400
# Determine active metadata source — respect user's configured primary
source_name = _get_active_discovery_source()
if source_name == 'spotify' and spotify_client and spotify_client.is_spotify_authenticated():
active_client = spotify_client
else:
active_client = _get_metadata_fallback_client()
source_name = _get_metadata_fallback_source()
cache = get_metadata_cache()
# Parse extra_data and group tracks by artist using discovered data
artist_groups = {}
for t in tracks:
extra = {}
if t.get('extra_data'):
try:
extra = json.loads(t['extra_data']) if isinstance(t['extra_data'], str) else t['extra_data']
except (json.JSONDecodeError, TypeError):
pass
# Only use discovery data if it matches the active metadata source
is_discovered = extra.get('discovered', False)
provider = (extra.get('provider') or '').lower()
source_matches = provider == source_name or (provider in ('itunes', 'apple') and source_name == 'itunes')
matched = extra.get('matched_data', {}) if (is_discovered and source_matches) else {}
artists_list = matched.get('artists', [])
primary_artist = artists_list[0] if artists_list else None
# Artists can be dicts {"name": "X", "id": "Y"} or plain strings "X"
if isinstance(primary_artist, dict):
artist_name = primary_artist.get('name') or (t.get('artist_name') or '').strip()
artist_id = primary_artist.get('id') or None
elif isinstance(primary_artist, str):
artist_name = primary_artist or (t.get('artist_name') or '').strip()
artist_id = None
else:
artist_name = (t.get('artist_name') or '').strip()
artist_id = None
if not artist_name:
continue
key = artist_name.lower()
if key not in artist_groups:
artist_groups[key] = {
'name': artist_name,
'artist_id': artist_id, # Pre-resolved from discovery
'tracks': [],
'album_names': set(),
'discovered': extra.get('discovered', False),
}
# If we get an artist_id from a later track but didn't have one before, fill it in
if artist_id and not artist_groups[key].get('artist_id'):
artist_groups[key]['artist_id'] = artist_id
artist_groups[key]['tracks'].append(t.get('track_name', ''))
# Get album name from discovered data or playlist field
album_name = ''
album_data = matched.get('album')
if isinstance(album_data, dict) and album_data.get('name'):
album_name = album_data['name']
elif (t.get('album_name') or '').strip():
album_name = t['album_name'].strip()
if album_name:
artist_groups[key]['album_names'].add(album_name)
def _normalize_for_match(title):
import re
return re.sub(r'\s*[\(\[][^)\]]*[\)\]]', '', title).strip().lower()
def _fetch_artist_discography(artist_name, known_artist_id=None):
"""Fetch discography using the active client. Checks cache first, stores results after.
If known_artist_id is provided (from discovery cache), skips the name search."""
# Check cache for this artist's discography
cache_key = f"explorer_disco_{artist_name.lower().strip()}"
cached = cache.get_entity(source_name, 'artist_discography', cache_key) if cache else None
if cached and isinstance(cached, dict) and cached.get('albums'):
logger.debug(f"Explorer: cache hit for '{artist_name}' discography")
return cached
artist_id = known_artist_id
artist_image = None
if artist_id:
# Already have the ID from discovery — just fetch the artist image
try:
artist_info = active_client.get_artist(artist_id)
if artist_info:
if isinstance(artist_info, dict):
images = artist_info.get('images') or []
artist_image = images[0].get('url') if images else None
elif hasattr(artist_info, 'image_url'):
artist_image = artist_info.image_url
except Exception:
pass
else:
# No pre-resolved ID — search by name
try:
search_results = active_client.search_artists(artist_name, limit=5)
except Exception as e:
return {'success': False, 'error': f'Search failed: {e}'}
if not search_results:
return {'success': False, 'error': f'"{artist_name}" not found'}
# Find best match (exact first, then fuzzy)
best = None
for a in search_results:
if a.name.lower().strip() == artist_name.lower().strip():
best = a
break
if not best:
best = search_results[0]
artist_id = best.id
artist_image = best.image_url if hasattr(best, 'image_url') else None
# Fetch albums
try:
# skip_cache only supported by spotify_client — other clients don't cache this call
_skip = {'skip_cache': True} if hasattr(active_client, 'sp') else {}
all_albums = active_client.get_artist_albums(artist_id, album_type='album,single', **_skip)
except Exception as e:
return {'success': False, 'error': f'Album fetch failed: {e}'}
if not all_albums:
return {'success': False, 'error': 'No albums found'}
# Check which albums the user already owns
owned_titles = set()
try:
db = get_database()
with db._get_connection() as conn:
cursor = conn.cursor()
# Find all artists in DB matching this name
cursor.execute("SELECT id FROM artists WHERE LOWER(name) = LOWER(?)", (artist_name,))
artist_rows = cursor.fetchall()
for ar in artist_rows:
cursor.execute("SELECT title FROM albums WHERE artist_id = ?", (ar['id'],))
for alb_row in cursor.fetchall():
owned_titles.add((alb_row['title'] or '').strip().lower())
except Exception:
pass # Non-critical — owned badges just won't show
# Build release list
releases = []
for album in all_albums:
# Skip albums where this artist isn't primary
if hasattr(album, 'artist_ids') and album.artist_ids and album.artist_ids[0] != artist_id:
continue
releases.append({
'title': album.name,
'year': album.release_date[:4] if album.release_date else None,
'image_url': album.image_url,
'spotify_id': album.id,
'track_count': album.total_tracks,
'album_type': (album.album_type or 'album').lower(),
'owned': (album.name or '').strip().lower() in owned_titles,
})
result = {
'success': True,
'name': artist_name, # Required for metadata cache validation
'albums': releases,
'artist_image': artist_image,
'artist_id': artist_id,
'artist_name': artist_name,
}
# Store in cache
if cache and releases:
try:
cache.store_entity(source_name, 'artist_discography', cache_key, result)
except Exception:
pass
return result
def generate():
yield json.dumps({
"type": "meta",
"playlist_name": playlist.get('name', 'Unknown Playlist'),
"playlist_image": playlist.get('image_url', ''),
"total_artists": len(artist_groups),
"total_tracks": len(tracks),
"source": source_name,
}) + '\n'
total_albums = 0
for idx, (_key, group) in enumerate(artist_groups.items()):
artist_name = group['name']
playlist_track_names = group['tracks']
playlist_album_names = group['album_names']
try:
disco = _fetch_artist_discography(artist_name, group.get('artist_id'))
if not disco.get('success'):
yield json.dumps({
"type": "artist",
"name": artist_name,
"artist_id": None,
"image_url": None,
"playlist_tracks": playlist_track_names,
"albums": [],
"error": disco.get('error', 'Not found'),
}) + '\n'
time.sleep(0.1)
continue
# Tag each release with in_playlist flag
# If no album names available, fall back to matching track names against single titles
match_names = playlist_album_names
if not match_names:
match_names = set(playlist_track_names)
all_releases = []
for release in disco.get('albums', []):
r = dict(release)
norm_title = _normalize_for_match(r['title'])
r['in_playlist'] = any(
_normalize_for_match(a) == norm_title or
norm_title in _normalize_for_match(a) or
_normalize_for_match(a) in norm_title
for a in match_names
)
all_releases.append(r)
# Filter based on mode
if mode == 'albums':
filtered = [r for r in all_releases if r['in_playlist']]
else:
filtered = all_releases
# Playlist explorer build-tree route lives in core/playlists/explorer.py.
from core.playlists import explorer as _playlists_explorer
filtered.sort(key=lambda r: (not r.get('in_playlist', False), -(int(r.get('year') or 0))))
total_albums += len(filtered)
yield json.dumps({
"type": "artist",
"name": disco.get('artist_name', artist_name),
"artist_id": disco.get('artist_id'),
"image_url": disco.get('artist_image'),
"playlist_tracks": playlist_track_names,
"albums": filtered,
}) + '\n'
except Exception as e:
logger.error(f"Explorer: error processing artist '{artist_name}': {e}")
yield json.dumps({
"type": "artist",
"name": artist_name,
"artist_id": None,
"image_url": None,
"playlist_tracks": playlist_track_names,
"albums": [],
"error": str(e),
}) + '\n'
# Rate limit protection between artists
if idx < len(artist_groups) - 1:
time.sleep(0.2)
def _build_playlist_explorer_deps():
"""Build the PlaylistExplorerDeps bundle from web_server.py globals on each call."""
return _playlists_explorer.PlaylistExplorerDeps(
request=request,
flask_response=Response,
flask_jsonify=jsonify,
spotify_client=spotify_client,
get_database=get_database,
get_active_discovery_source=_get_active_discovery_source,
get_metadata_fallback_client=_get_metadata_fallback_client,
get_metadata_fallback_source=_get_metadata_fallback_source,
get_metadata_cache=get_metadata_cache,
)
get_database().mark_mirrored_playlist_explored(playlist_id)
yield json.dumps({"type": "complete", "total_artists": len(artist_groups), "total_albums": total_albums}) + '\n'
return Response(generate(), mimetype='application/x-ndjson', headers={
'Cache-Control': 'no-cache',
'X-Accel-Buffering': 'no',
})
@app.route('/api/playlist-explorer/build-tree', methods=['POST'])
def playlist_explorer_build_tree():
return _playlists_explorer.playlist_explorer_build_tree(_build_playlist_explorer_deps())
except Exception as e:
logger.error(f"Playlist Explorer build-tree error: {e}")
import traceback
traceback.print_exc()
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/playlist-explorer/album-tracks/<album_id>', methods=['GET'])

Loading…
Cancel
Save