Make wishlist respect configured providers

- add neutral wishlist payload helpers while keeping legacy Spotify aliases
- route wishlist removal and classification through generic track data
- keep API and service compatibility for existing callers
pull/435/head
Antti Kettunen 3 weeks ago
parent 167718d694
commit 0fa692f935
No known key found for this signature in database
GPG Key ID: C6B2A3D250359BD7

@ -287,12 +287,12 @@ def serialize_watchlist_artist(obj, fields: Optional[Set[str]] = None) -> dict:
def serialize_wishlist_track(obj, fields: Optional[Set[str]] = None) -> dict:
"""Standardized wishlist track serialization."""
d = _to_dict(obj)
spotify_data = d.get("spotify_data", {})
if isinstance(spotify_data, str):
track_data = d.get("track_data", d.get("spotify_data", {}))
if isinstance(track_data, str):
try:
spotify_data = json.loads(spotify_data)
track_data = json.loads(track_data)
except (json.JSONDecodeError, TypeError):
spotify_data = {}
track_data = {}
source_info = d.get("source_info")
if isinstance(source_info, str):
@ -303,17 +303,23 @@ def serialize_wishlist_track(obj, fields: Optional[Set[str]] = None) -> dict:
result = {
"id": d.get("id"),
"track_id": d.get("track_id") or d.get("spotify_track_id") or d.get("id"),
"spotify_track_id": d.get("spotify_track_id"),
"track_name": spotify_data.get("name", "Unknown") if isinstance(spotify_data, dict) else "Unknown",
"track_name": (
track_data.get("name", "Unknown") if isinstance(track_data, dict) else d.get("track_name", "Unknown")
),
"artist_name": ", ".join(
a.get("name", "") for a in spotify_data.get("artists", [])
) if isinstance(spotify_data, dict) and isinstance(spotify_data.get("artists"), list) else "",
a.get("name", "") if isinstance(a, dict) else str(a)
for a in track_data.get("artists", [])
) if isinstance(track_data, dict) and isinstance(track_data.get("artists"), list) else "",
"album_name": (
spotify_data.get("album", {}).get("name")
if isinstance(spotify_data, dict) and isinstance(spotify_data.get("album"), dict)
track_data.get("album", {}).get("name")
if isinstance(track_data, dict) and isinstance(track_data.get("album"), dict)
else None
),
"spotify_data": spotify_data,
"track_data": track_data,
"spotify_data": track_data,
"provider": track_data.get("provider") if isinstance(track_data, dict) else d.get("provider"),
"failure_reason": d.get("failure_reason"),
"retry_count": d.get("retry_count", 0),
"last_attempted": _isoformat(d.get("last_attempted")),

@ -45,16 +45,16 @@ def register_routes(bp):
def add_to_wishlist():
"""Add a track to the wishlist.
Body: {"spotify_track_data": {...}, "failure_reason": "...", "source_type": "..."}
Body: {"track_data": {...}, "failure_reason": "...", "source_type": "..."}
"""
body = request.get_json(silent=True) or {}
track_data = body.get("spotify_track_data")
track_data = body.get("track_data") or body.get("spotify_track_data")
reason = body.get("failure_reason", "Added via API")
source_type = body.get("source_type", "api")
profile_id = parse_profile_id(request)
if not track_data:
return api_error("BAD_REQUEST", "Missing 'spotify_track_data' in body.", 400)
return api_error("BAD_REQUEST", "Missing 'track_data' in body.", 400)
try:
from database.music_database import get_database
@ -74,7 +74,7 @@ def register_routes(bp):
@bp.route("/wishlist/<track_id>", methods=["DELETE"])
@require_api_key
def remove_from_wishlist(track_id):
"""Remove a track from the wishlist by its Spotify track ID."""
"""Remove a track from the wishlist by its track ID."""
profile_id = parse_profile_id(request)
try:
from database.music_database import get_database

@ -6,16 +6,32 @@ import json
from typing import Any, Dict
def _extract_track_data(track: Dict[str, Any]) -> Dict[str, Any]:
for key in ("track_data", "spotify_data", "metadata", "track"):
data = track.get(key)
if isinstance(data, str):
try:
data = json.loads(data)
except Exception:
data = {}
if isinstance(data, dict) and data:
nested = data.get("track_data") or data.get("spotify_data") or data.get("metadata") or data.get("track")
if isinstance(nested, str):
try:
nested = json.loads(nested)
except Exception:
nested = {}
if isinstance(nested, dict) and nested:
return nested
return data
return {}
def classify_wishlist_track(track: Dict[str, Any]) -> str:
"""Classify a wishlist track as `singles` or `albums`."""
spotify_data = track.get('spotify_data', {})
if isinstance(spotify_data, str):
try:
spotify_data = json.loads(spotify_data)
except Exception:
spotify_data = {}
album_data = spotify_data.get('album') or {}
track_data = _extract_track_data(track)
album_data = track_data.get('album') or {}
if not isinstance(album_data, dict):
album_data = {}
total_tracks = album_data.get('total_tracks')

@ -20,16 +20,12 @@ def sanitize_track_data_for_processing(track_data):
logger.info(f"[Sanitize] Unexpected track data type: {type(track_data)}")
return track_data
# Create a copy to avoid modifying original data
sanitized = track_data.copy()
# Handle album field - preserve dict format to retain full metadata (images, id, etc.)
# Downstream code already handles both dict and string formats defensively
raw_album = sanitized.get('album', '')
if not isinstance(raw_album, (dict, str)):
sanitized['album'] = str(raw_album)
# Handle artists field - ensure it's a list of strings
raw_artists = sanitized.get('artists', [])
if isinstance(raw_artists, list):
processed_artists = []
@ -68,25 +64,22 @@ def get_track_artist_name(track_info):
return "Unknown Artist"
def ensure_spotify_track_format(track_info):
def ensure_wishlist_track_format(track_info):
"""
Ensure track_info has proper Spotify track structure for wishlist service.
Converts webui track format to match sync.py's spotify_track format.
Ensure track_info has a consistent wishlist track structure.
This keeps the legacy Spotify-shaped fields because the download pipeline
still expects them, but the helper itself is provider-agnostic.
"""
if not track_info:
return {}
# If it already has the proper Spotify structure, return as-is
if isinstance(track_info.get('artists'), list) and len(track_info.get('artists', [])) > 0:
first_artist = track_info['artists'][0]
if isinstance(first_artist, dict) and 'name' in first_artist:
# Already has proper Spotify format
return track_info
# Convert to proper Spotify format
artists_list = []
# Handle different artist formats from webui
artists = track_info.get('artists', [])
if artists:
if isinstance(artists, list):
@ -98,22 +91,17 @@ def ensure_spotify_track_format(track_info):
else:
artists_list.append({'name': str(artist)})
else:
# Single artist as string
artists_list.append({'name': str(artists)})
else:
# Fallback: try single artist field
artist = track_info.get('artist')
if artist:
artists_list.append({'name': str(artist)})
else:
artists_list.append({'name': 'Unknown Artist'})
# Build album object - preserve ALL fields (id, release_date, total_tracks,
# album_type, images, etc.) so wishlist tracks retain full album context
# for correct folder placement, multi-disc support, and classification
album_data = track_info.get('album', {})
if isinstance(album_data, dict):
album = dict(album_data) # Copy all fields
album = dict(album_data)
album.setdefault('name', 'Unknown Album')
else:
album = {
@ -126,11 +114,10 @@ def ensure_spotify_track_format(track_info):
album.setdefault('album_type', 'album')
album.setdefault('total_tracks', 0)
# Build proper Spotify track structure
spotify_track = {
return {
'id': track_info.get('id', f"webui_{hash(str(track_info))}"),
'name': track_info.get('name', 'Unknown Track'),
'artists': artists_list, # Proper Spotify format
'artists': artists_list,
'album': album,
'duration_ms': track_info.get('duration_ms', 0),
'track_number': track_info.get('track_number', 1),
@ -138,18 +125,17 @@ def ensure_spotify_track_format(track_info):
'preview_url': track_info.get('preview_url'),
'external_urls': track_info.get('external_urls', {}),
'popularity': track_info.get('popularity', 0),
'source': 'webui_modal' # Mark as coming from webui
'source': track_info.get('source', 'webui_modal'),
}
return spotify_track
def ensure_spotify_track_format(track_info):
"""Backward-compatible wrapper for `ensure_wishlist_track_format`."""
return ensure_wishlist_track_format(track_info)
def build_cancelled_task_wishlist_payload(task, profile_id: int = 1):
"""Build the wishlist payload for a cancelled download task.
This preserves the current web_server.py behavior while moving the
data-shaping logic into the wishlist package.
"""
def build_cancelled_task_wishlist_payload(task, profile_id: int = 1):
"""Build the wishlist payload for a cancelled download task."""
if not task:
return {}
@ -170,31 +156,27 @@ def build_cancelled_task_wishlist_payload(task, profile_id: int = 1):
else:
formatted_artists.append({'name': str(artist)})
# Build album data - preserve all fields (including artists) for correct folder placement
album_raw = track_info.get('album', {})
if isinstance(album_raw, dict):
album_data = dict(album_raw) # Copy all fields including artists
album_data = dict(album_raw)
album_data.setdefault('name', 'Unknown Album')
album_data.setdefault('album_type', track_info.get('album_type', 'album'))
# Add images fallback if not present
if 'images' not in album_data and track_info.get('album_image_url'):
album_data['images'] = [{'url': track_info.get('album_image_url')}]
else:
# album is a string (album name)
album_data = {
'name': str(album_raw) if album_raw else 'Unknown Album',
'album_type': track_info.get('album_type', 'album')
'album_type': track_info.get('album_type', 'album'),
}
# Add album image if available
if track_info.get('album_image_url'):
album_data['images'] = [{'url': track_info.get('album_image_url')}]
spotify_track_data = {
track_data = {
'id': track_info.get('id'),
'name': track_info.get('name'),
'artists': formatted_artists,
'album': album_data,
'duration_ms': track_info.get('duration_ms')
'duration_ms': track_info.get('duration_ms'),
}
source_context = {
@ -204,7 +186,8 @@ def build_cancelled_task_wishlist_payload(task, profile_id: int = 1):
}
return {
'spotify_track_data': spotify_track_data,
'spotify_track_data': track_data,
'track_data': track_data,
'failure_reason': 'Download cancelled by user (v2)',
'source_type': 'playlist',
'source_context': source_context,
@ -228,76 +211,31 @@ def build_failed_track_wishlist_context(
'track_name': track_info.get('name', 'Unknown Track'),
'artist_name': get_track_artist_name(track_info),
'retry_count': retry_count,
'spotify_track': ensure_spotify_track_format(track_info),
'spotify_track': ensure_wishlist_track_format(track_info),
'track_data': ensure_wishlist_track_format(track_info),
'failure_reason': failure_reason,
'candidates': list(candidates or []),
}
def extract_spotify_track_from_modal_info(track_info: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
Extract Spotify track data from modal track_info structure.
Handles different formats from sync.py and artists.py modals.
"""
try:
# Try to find Spotify track data in various locations within track_info
# Check if we have direct Spotify track reference
if "spotify_track" in track_info and track_info["spotify_track"]:
spotify_track = track_info["spotify_track"]
# Convert to dictionary if it's an object
if hasattr(spotify_track, "__dict__"):
return spotify_track_object_to_dict(spotify_track)
if isinstance(spotify_track, dict):
return spotify_track
# Check if we have slskd_result with embedded metadata
if "slskd_result" in track_info and track_info["slskd_result"]:
slskd_result = track_info["slskd_result"]
# Look for Spotify metadata in the result
if hasattr(slskd_result, "artist") and hasattr(slskd_result, "title"):
album_name = getattr(slskd_result, "album", "") or getattr(slskd_result, "title", "Unknown Album")
return {
"id": f"reconstructed_{hash(f'{slskd_result.artist}_{slskd_result.title}')}",
"name": getattr(slskd_result, "title", "Unknown Track"),
"artists": [{"name": getattr(slskd_result, "artist", "Unknown Artist")}],
"album": {"name": album_name, "images": [], "album_type": "single", "total_tracks": 1},
"duration_ms": 0,
"reconstructed": True,
}
# If no Spotify data found, try to reconstruct from available info
logger.warning("Could not find Spotify track data in modal info, attempting reconstruction")
return None
except Exception as e:
logger.error(f"Error extracting Spotify track from modal info: {e}")
return None
def spotify_track_object_to_dict(spotify_track) -> Dict[str, Any]:
"""Convert a Spotify track object or TrackResult object to a dictionary."""
def track_object_to_dict(track_object) -> Dict[str, Any]:
"""Convert a track object or TrackResult object to a dictionary."""
try:
logger.debug(
"Converting track object to dict: type=%s has_title=%s has_artist=%s has_id=%s",
type(spotify_track),
hasattr(spotify_track, "title"),
hasattr(spotify_track, "artist"),
hasattr(spotify_track, "id"),
type(track_object),
hasattr(track_object, "title"),
hasattr(track_object, "artist"),
hasattr(track_object, "id"),
)
# Check if this is a TrackResult object (has title/artist but no id)
if hasattr(spotify_track, "title") and hasattr(spotify_track, "artist") and not hasattr(spotify_track, "id"):
if hasattr(track_object, "title") and hasattr(track_object, "artist") and not hasattr(track_object, "id"):
logger.debug("Detected TrackResult object, converting")
# Handle TrackResult objects - these don't have Spotify IDs
album_name = getattr(spotify_track, "album", "") or getattr(spotify_track, "title", "Unknown Album")
album_name = getattr(track_object, "album", "") or getattr(track_object, "title", "Unknown Album")
result = {
"id": f"trackresult_{hash(f'{spotify_track.artist}_{spotify_track.title}')}",
"name": getattr(spotify_track, "title", "Unknown Track"),
"artists": [{"name": getattr(spotify_track, "artist", "Unknown Artist")}],
"id": f"trackresult_{hash(f'{track_object.artist}_{track_object.title}')}",
"name": getattr(track_object, "title", "Unknown Track"),
"artists": [{"name": getattr(track_object, "artist", "Unknown Artist")}],
"album": {"name": album_name, "images": [], "album_type": "single", "total_tracks": 1},
"duration_ms": 0,
"preview_url": None,
@ -312,12 +250,10 @@ def spotify_track_object_to_dict(spotify_track) -> Dict[str, Any]:
)
return result
# Handle regular Spotify Track objects
logger.debug("Processing as Spotify Track object")
logger.debug("Processing as track object")
# Handle artists list carefully to avoid TrackResult serialization issues
artists_list = []
raw_artists = getattr(spotify_track, "artists", [])
raw_artists = getattr(track_object, "artists", [])
logger.debug("Raw artists: %r (type=%s)", raw_artists, type(raw_artists))
for artist in raw_artists:
@ -327,47 +263,43 @@ def spotify_track_object_to_dict(spotify_track) -> Dict[str, Any]:
elif isinstance(artist, str):
artists_list.append({"name": artist})
else:
# Convert any complex objects to string to avoid serialization issues
artists_list.append({"name": str(artist)})
# Handle album safely
album_name = "Unknown Album"
if hasattr(spotify_track, "album") and spotify_track.album:
if hasattr(spotify_track.album, "name"):
album_name = spotify_track.album.name
if hasattr(track_object, "album") and track_object.album:
if hasattr(track_object.album, "name"):
album_name = track_object.album.name
else:
album_name = str(spotify_track.album)
album_name = str(track_object.album)
result = {
"id": getattr(spotify_track, "id", None),
"name": getattr(spotify_track, "name", "Unknown Track"),
"id": getattr(track_object, "id", None),
"name": getattr(track_object, "name", "Unknown Track"),
"artists": artists_list,
"album": {"name": album_name},
"duration_ms": getattr(spotify_track, "duration_ms", 0),
"preview_url": getattr(spotify_track, "preview_url", None),
"external_urls": getattr(spotify_track, "external_urls", {}),
"popularity": getattr(spotify_track, "popularity", 0),
"track_number": getattr(spotify_track, "track_number", 1),
"disc_number": getattr(spotify_track, "disc_number", 1),
"duration_ms": getattr(track_object, "duration_ms", 0),
"preview_url": getattr(track_object, "preview_url", None),
"external_urls": getattr(track_object, "external_urls", {}),
"popularity": getattr(track_object, "popularity", 0),
"track_number": getattr(track_object, "track_number", 1),
"disc_number": getattr(track_object, "disc_number", 1),
}
logger.debug(
"Spotify Track converted: name=%s artists=%s",
"Track converted: name=%s artists=%s",
result["name"],
[a["name"] for a in result["artists"]],
)
# Test JSON serialization before returning to catch any remaining issues
try:
json.dumps(result)
logger.debug("Conversion result is JSON serializable")
except Exception as json_error:
logger.error("Conversion result is NOT JSON serializable: %s", json_error)
logger.error("Conversion result content: %r", result)
# Return a safe fallback
return {
"id": f"fallback_{hash(str(spotify_track))}",
"name": str(getattr(spotify_track, "name", "Unknown Track")),
"id": f"fallback_{hash(str(track_object))}",
"name": str(getattr(track_object, "name", "Unknown Track")),
"artists": [{"name": "Unknown Artist"}],
"album": {"name": "Unknown Album"},
"duration_ms": 0,
@ -380,17 +312,72 @@ def spotify_track_object_to_dict(spotify_track) -> Dict[str, Any]:
return result
except Exception as e:
logger.error(f"Error converting track object to dict: {e}")
logger.error(f"Object type: {type(spotify_track)}")
logger.error(f"Object attributes: {dir(spotify_track)}")
logger.error(f"Object type: {type(track_object)}")
logger.error(f"Object attributes: {dir(track_object)}")
return {}
def spotify_track_object_to_dict(spotify_track) -> Dict[str, Any]:
"""Backward-compatible wrapper for `track_object_to_dict`."""
return track_object_to_dict(spotify_track)
def extract_wishlist_track_from_modal_info(track_info: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
Extract a track payload from modal track_info structure.
"""
try:
if not isinstance(track_info, dict):
return None
for key in ("track_data", "track", "metadata_track", "spotify_track"):
if key not in track_info or not track_info[key]:
continue
extracted = track_info[key]
if hasattr(extracted, "__dict__"):
return track_object_to_dict(extracted)
if isinstance(extracted, dict):
return extracted
if track_info.get("name") or track_info.get("title"):
if track_info.get("artists") or track_info.get("artist"):
return ensure_wishlist_track_format(track_info)
if "slskd_result" in track_info and track_info["slskd_result"]:
slskd_result = track_info["slskd_result"]
if hasattr(slskd_result, "artist") and hasattr(slskd_result, "title"):
album_name = getattr(slskd_result, "album", "") or getattr(slskd_result, "title", "Unknown Album")
return {
"id": f"reconstructed_{hash(f'{slskd_result.artist}_{slskd_result.title}')}",
"name": getattr(slskd_result, "title", "Unknown Track"),
"artists": [{"name": getattr(slskd_result, "artist", "Unknown Artist")}],
"album": {"name": album_name, "images": [], "album_type": "single", "total_tracks": 1},
"duration_ms": 0,
"reconstructed": True,
}
logger.warning("Could not find track data in modal info, attempting reconstruction")
return None
except Exception as e:
logger.error(f"Error extracting track from modal info: {e}")
return None
def extract_spotify_track_from_modal_info(track_info: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Backward-compatible wrapper for `extract_wishlist_track_from_modal_info`."""
return extract_wishlist_track_from_modal_info(track_info)
__all__ = [
"sanitize_track_data_for_processing",
"get_track_artist_name",
"ensure_wishlist_track_format",
"ensure_spotify_track_format",
"build_cancelled_task_wishlist_payload",
"build_failed_track_wishlist_context",
"extract_spotify_track_from_modal_info",
"track_object_to_dict",
"spotify_track_object_to_dict",
"extract_wishlist_track_from_modal_info",
"extract_spotify_track_from_modal_info",
]

@ -57,17 +57,16 @@ def check_and_remove_from_wishlist(context: Dict[str, Any], wishlist_service=Non
search_result = get_import_original_search(context) or get_import_search_result(context)
track_id = None
if source == "spotify":
track_id = source_ids.get("track_id") or None
if track_id:
logger.info("[Wishlist] Found %s track ID from source_ids: %s", source_label, track_id)
track_id = source_ids.get("track_id") or None
if track_id:
logger.info("[Wishlist] Found %s track ID from source_ids: %s", source_label, track_id)
elif "wishlist_id" in track_info:
wishlist_id = track_info["wishlist_id"]
logger.info("[Wishlist] Found wishlist_id in context: %s", wishlist_id)
wishlist_tracks = _all_profile_wishlist_tracks(wishlist_service, database=database)
for wishlist_track in wishlist_tracks:
if wishlist_track.get("wishlist_id") == wishlist_id:
track_id = wishlist_track.get("spotify_track_id") or wishlist_track.get("id")
track_id = wishlist_track.get("track_id") or wishlist_track.get("spotify_track_id") or wishlist_track.get("id")
logger.info("[Wishlist] Found track ID from wishlist entry: %s", track_id)
break
@ -93,7 +92,7 @@ def check_and_remove_from_wishlist(context: Dict[str, Any], wishlist_service=Non
else:
wl_artist_name = str(wl_artists[0]).lower()
if wl_name == track_name.lower() and wl_artist_name == artist_name.lower():
track_id = wishlist_track.get("spotify_track_id") or wishlist_track.get("id")
track_id = wishlist_track.get("track_id") or wishlist_track.get("spotify_track_id") or wishlist_track.get("id")
logger.info("[Wishlist] Found fuzzy match - track ID: %s", track_id)
break
@ -152,7 +151,11 @@ def check_and_remove_track_from_wishlist_by_metadata(
wl_artist_name = str(wl_artists[0]).lower()
if wl_name == track_name.lower() and wl_artist_name == primary_artist.lower():
spotify_track_id = wishlist_track.get("spotify_track_id") or wishlist_track.get("id")
spotify_track_id = (
wishlist_track.get("track_id")
or wishlist_track.get("spotify_track_id")
or wishlist_track.get("id")
)
if spotify_track_id:
removed = wishlist_service.mark_track_download_result(spotify_track_id, success=True)
if removed:

@ -46,7 +46,7 @@ def _build_album_images(album: Dict[str, Any]) -> list[dict[str, Any]]:
return []
def _build_spotify_track_data(track: Dict[str, Any], album: Dict[str, Any]) -> Dict[str, Any]:
def _build_track_data(track: Dict[str, Any], album: Dict[str, Any]) -> Dict[str, Any]:
album_images = _build_album_images(album)
return {
"id": track.get("id"),
@ -71,6 +71,11 @@ def _build_spotify_track_data(track: Dict[str, Any], album: Dict[str, Any]) -> D
}
def _build_spotify_track_data(track: Dict[str, Any], album: Dict[str, Any]) -> Dict[str, Any]:
"""Backward-compatible wrapper for `_build_track_data`."""
return _build_track_data(track, album)
def _load_track_spotify_data(track: Dict[str, Any]) -> Dict[str, Any]:
spotify_data = track.get("spotify_data", {})
if isinstance(spotify_data, str):
@ -323,7 +328,7 @@ def remove_album_from_wishlist(
matched = True
if matched:
spotify_track_id = track.get("spotify_track_id") or track.get("id")
spotify_track_id = track.get("track_id") or track.get("spotify_track_id") or track.get("id")
if spotify_track_id:
tracks_to_remove.append(spotify_track_id)
@ -388,7 +393,7 @@ def add_album_track_to_wishlist(
if not track or not artist or not album:
return {"success": False, "error": "Missing required fields: track, artist, album"}, 400
spotify_track_data = _build_spotify_track_data(track, album)
track_data = _build_track_data(track, album)
enhanced_source_context = {
**(source_context or {}),
@ -399,8 +404,8 @@ def add_album_track_to_wishlist(
"added_via": "library_wishlist_modal",
}
success = get_wishlist_service().add_spotify_track_to_wishlist(
spotify_track_data=spotify_track_data,
success = get_wishlist_service().add_track_to_wishlist(
track_data=track_data,
failure_reason="Added from library (incomplete album)",
source_type=source_type,
source_context=enhanced_source_context,

@ -20,7 +20,11 @@ def sanitize_and_dedupe_wishlist_tracks(
for track in raw_tracks:
sanitized_track = sanitizer(track)
spotify_track_id = sanitized_track.get('spotify_track_id') or sanitized_track.get('id')
spotify_track_id = (
sanitized_track.get('track_id')
or sanitized_track.get('spotify_track_id')
or sanitized_track.get('id')
)
if spotify_track_id and spotify_track_id in seen_track_ids:
duplicates_found += 1
@ -45,7 +49,7 @@ def filter_wishlist_tracks_by_category(
for track in tracks:
track_category = classifier(track)
spotify_track_id = track.get('spotify_track_id') or track.get('id')
spotify_track_id = track.get('track_id') or track.get('spotify_track_id') or track.get('id')
if category != track_category:
continue

@ -6,7 +6,7 @@ Wishlist Service - High-level service for managing failed download track wishlis
from typing import Any, Dict, List, Optional
from core.wishlist.payloads import extract_spotify_track_from_modal_info
from core.wishlist.payloads import extract_wishlist_track_from_modal_info
from database.music_database import get_database
from utils.logging_config import get_logger
@ -44,10 +44,10 @@ class WishlistService:
source_context: Additional context (playlist name, album info, etc.)
"""
try:
# Extract Spotify track data from the track_info structure
spotify_track = extract_spotify_track_from_modal_info(track_info)
if not spotify_track:
logger.error("Could not extract Spotify track data from modal info")
# Extract track data from the modal structure.
track_data = extract_wishlist_track_from_modal_info(track_info)
if not track_data:
logger.error("Could not extract track data from modal info")
return False
# Get failure reason from track_info if available
@ -81,7 +81,7 @@ class WishlistService:
# Add to wishlist via database
return self.database.add_to_wishlist(
spotify_track_data=spotify_track,
spotify_track_data=track_data,
failure_reason=failure_reason,
source_type=source_type,
source_info=source_info,
@ -92,32 +92,49 @@ class WishlistService:
logger.error(f"Error adding failed track to wishlist: {e}")
return False
def add_spotify_track_to_wishlist(
def add_track_to_wishlist(
self,
spotify_track_data: Dict[str, Any],
track_data: Dict[str, Any],
failure_reason: str,
source_type: str = "manual",
source_context: Dict[str, Any] = None,
profile_id: int = 1,
) -> bool:
"""
Directly add a Spotify track to the wishlist.
Directly add a track to the wishlist.
Args:
spotify_track_data: Full Spotify track data dictionary
track_data: Full track data dictionary
failure_reason: Reason for the failure
source_type: Source type ('playlist', 'album', 'manual')
source_context: Additional context information
profile_id: Profile to add to
"""
return self.database.add_to_wishlist(
spotify_track_data=spotify_track_data,
spotify_track_data=track_data,
failure_reason=failure_reason,
source_type=source_type,
source_info=source_context or {},
profile_id=profile_id,
)
def add_spotify_track_to_wishlist(
self,
spotify_track_data: Dict[str, Any],
failure_reason: str,
source_type: str = "manual",
source_context: Dict[str, Any] = None,
profile_id: int = 1,
) -> bool:
"""Backward-compatible wrapper for `add_track_to_wishlist`."""
return self.add_track_to_wishlist(
track_data=spotify_track_data,
failure_reason=failure_reason,
source_type=source_type,
source_context=source_context,
profile_id=profile_id,
)
def get_wishlist_tracks_for_download(
self,
limit: Optional[int] = None,
@ -132,30 +149,61 @@ class WishlistService:
formatted_tracks = []
for wishlist_track in wishlist_tracks:
spotify_data = wishlist_track["spotify_data"]
track_data = wishlist_track.get("track_data") or wishlist_track.get("spotify_data") or {}
if isinstance(track_data, str):
try:
import json
track_data = json.loads(track_data)
except Exception:
track_data = {}
if not isinstance(track_data, dict):
track_data = {}
track_id = wishlist_track.get("spotify_track_id") or wishlist_track.get("id") or track_data.get("id")
track_name = track_data.get("name", "Unknown Track")
artists = track_data.get("artists", [])
album = track_data.get("album") if isinstance(track_data.get("album"), dict) else {}
if isinstance(artists, list) and artists:
first_artist = artists[0]
if isinstance(first_artist, dict):
artist_name = first_artist.get("name", "Unknown Artist")
else:
artist_name = str(first_artist)
else:
artist_name = "Unknown Artist"
album_name = album.get("name", "") if isinstance(album, dict) else str(album) if album else ""
# Create a track object similar to what download modals expect
formatted_track = {
"wishlist_id": wishlist_track["id"],
"track_id": track_id,
"track_data": track_data,
"track_name": track_name,
"artist_name": artist_name,
"album_name": album_name,
"provider": (
track_data.get("provider") or track_data.get("source")
if isinstance(track_data, dict)
else None
),
"spotify_track_id": wishlist_track["spotify_track_id"],
"spotify_data": spotify_data,
"spotify_data": track_data,
"failure_reason": wishlist_track["failure_reason"],
"retry_count": wishlist_track["retry_count"],
"date_added": wishlist_track["date_added"],
"last_attempted": wishlist_track["last_attempted"],
"source_type": wishlist_track["source_type"],
"source_info": wishlist_track["source_info"],
# Format for modal compatibility (similar to Spotify Track objects)
"id": spotify_data.get("id"),
"name": spotify_data.get("name", "Unknown Track"),
"artists": spotify_data.get("artists", []),
"album": spotify_data.get("album") or {},
"duration_ms": spotify_data.get("duration_ms", 0),
"preview_url": spotify_data.get("preview_url"),
"external_urls": spotify_data.get("external_urls", {}),
"popularity": spotify_data.get("popularity", 0),
"track_number": spotify_data.get("track_number", 1),
"disc_number": spotify_data.get("disc_number", 1),
"id": track_id,
"name": track_name,
"artists": artists,
"album": album or {},
"duration_ms": track_data.get("duration_ms", 0) if isinstance(track_data, dict) else 0,
"preview_url": track_data.get("preview_url") if isinstance(track_data, dict) else None,
"external_urls": track_data.get("external_urls", {}) if isinstance(track_data, dict) else {},
"popularity": track_data.get("popularity", 0) if isinstance(track_data, dict) else 0,
"track_number": track_data.get("track_number", 1) if isinstance(track_data, dict) else 1,
"disc_number": track_data.get("disc_number", 1) if isinstance(track_data, dict) else 1,
}
formatted_tracks.append(formatted_track)
@ -197,11 +245,15 @@ class WishlistService:
return self.database.clear_wishlist(profile_id=profile_id)
def check_track_in_wishlist(self, spotify_track_id: str) -> bool:
"""Check if a track exists in the wishlist by Spotify track ID"""
"""Check if a track exists in the wishlist by track ID."""
try:
wishlist_tracks = self.get_wishlist_tracks_for_download()
for track in wishlist_tracks:
if track.get("spotify_track_id") == spotify_track_id or track.get("id") == spotify_track_id:
if (
track.get("track_id") == spotify_track_id
or track.get("spotify_track_id") == spotify_track_id
or track.get("id") == spotify_track_id
):
return True
return False
except Exception as e:
@ -221,10 +273,9 @@ class WishlistService:
normalized_artist_name = artist_name.lower().strip()
for wl_track in wishlist_tracks:
wl_name = wl_track.get("name", "").lower().strip()
wl_name = (wl_track.get("track_name") or wl_track.get("name") or "").lower().strip()
wl_artists = wl_track.get("artists", [])
# Extract artist name from wishlist track
wl_artist_name = ""
if wl_artists:
if isinstance(wl_artists[0], dict):
@ -267,7 +318,16 @@ class WishlistService:
# Keep track of recent failures (last 5)
if len(recent_failures) < 5:
spotify_data = track["spotify_data"]
spotify_data = track.get("track_data") or track["spotify_data"] or {}
if isinstance(spotify_data, str):
try:
import json
spotify_data = json.loads(spotify_data)
except Exception:
spotify_data = {}
if not isinstance(spotify_data, dict):
spotify_data = {}
recent_failures.append(
{
"name": spotify_data.get("name", "Unknown Track"),

@ -6750,11 +6750,20 @@ class MusicDatabase:
# Wishlist management methods
def add_to_wishlist(self, spotify_track_data: Dict[str, Any], failure_reason: str = "Download failed",
source_type: str = "unknown", source_info: Dict[str, Any] = None,
profile_id: int = 1) -> bool:
def add_to_wishlist(
self,
spotify_track_data: Dict[str, Any] = None,
failure_reason: str = "Download failed",
source_type: str = "unknown",
source_info: Dict[str, Any] = None,
profile_id: int = 1,
track_data: Dict[str, Any] = None,
) -> bool:
"""Add a failed track to the wishlist for retry"""
try:
if track_data is not None and spotify_track_data is None:
spotify_track_data = track_data
with self._get_connection() as conn:
cursor = conn.cursor()

@ -7,6 +7,7 @@ from core.wishlist.classification import classify_wishlist_track
"spotify_data,expected",
[
({"album": {"album_type": "single"}}, "singles"),
({"track_data": {"album": {"album_type": "single"}}}, "singles"),
({"album": {"album_type": "ep"}}, "singles"),
({"album": {"album_type": "album"}}, "albums"),
({"album": {"album_type": "compilation"}}, "albums"),

@ -58,6 +58,20 @@ def test_ensure_spotify_track_format_builds_webui_shape():
assert out["source"] == "webui_modal"
def test_ensure_wishlist_track_format_aliases_the_spotify_helper():
track = {
"name": "Song",
"artist": "Artist One",
"album": {"name": "Album One"},
}
out = payloads.ensure_wishlist_track_format(track)
assert out["name"] == "Song"
assert out["artists"] == [{"name": "Artist One"}]
assert out["album"]["name"] == "Album One"
def test_extract_spotify_track_from_modal_info_converts_trackresult_like_object():
track_info = {
"spotify_track": SimpleNamespace(
@ -90,3 +104,20 @@ def test_extract_spotify_track_from_modal_info_reconstructs_from_slskd_result():
assert out["name"] == "Song Three"
assert out["artists"] == [{"name": "Artist Three"}]
assert out["album"]["name"] == "Album Three"
def test_extract_wishlist_track_from_modal_info_uses_track_data_key():
track_info = {
"track_data": {
"id": "track-1",
"name": "Song Four",
"artists": [{"name": "Artist Four"}],
"album": {"name": "Album Four"},
}
}
out = payloads.extract_wishlist_track_from_modal_info(track_info)
assert out["id"] == "track-1"
assert out["name"] == "Song Four"
assert out["artists"] == [{"name": "Artist Four"}]

@ -83,6 +83,40 @@ def test_check_and_remove_from_wishlist_uses_spotify_source_id():
assert wishlist_service.removed == [("sp-track-1", True, None, 1)]
def test_check_and_remove_from_wishlist_uses_non_spotify_source_id():
fake_db = SimpleNamespace(get_all_profiles=lambda: [{"id": 1}])
wishlist_service = _FakeWishlistService(
[
{
"wishlist_id": 11,
"spotify_track_id": "dz-track-1",
"id": "dz-track-1",
"name": "Song One",
"artists": [{"name": "Artist One"}],
}
]
)
context = {
"source": "deezer",
"track_info": {
"deezer_track_id": "dz-track-1",
"name": "Song One",
"artists": [{"name": "Artist One"}],
},
"search_result": {},
"original_search_result": {},
}
resolution.check_and_remove_from_wishlist(
context,
wishlist_service=wishlist_service,
database=fake_db,
)
assert wishlist_service.removed == [("dz-track-1", True, None, 1)]
def test_check_and_remove_from_wishlist_uses_wishlist_id_lookup():
fake_db = SimpleNamespace(get_all_profiles=lambda: [{"id": 1}])
wishlist_service = _FakeWishlistService(

@ -137,6 +137,10 @@ class _FakeWishlistService:
self.removed.append((spotify_track_id, profile_id))
return True
def add_track_to_wishlist(self, **kwargs):
self.add_calls.append(kwargs)
return True
def add_spotify_track_to_wishlist(self, **kwargs):
self.add_calls.append(kwargs)
return True
@ -479,11 +483,11 @@ def test_add_album_track_to_wishlist_builds_spotify_payload_and_merges_context()
"album_name": "Album One",
"added_via": "library_wishlist_modal",
}
assert add_call["spotify_track_data"]["album"]["images"] == [
assert add_call["track_data"]["album"]["images"] == [
{"url": "https://example.test/cover.jpg", "height": 640, "width": 640}
]
assert add_call["spotify_track_data"]["duration_ms"] == 1234
assert add_call["spotify_track_data"]["explicit"] is True
assert add_call["track_data"]["duration_ms"] == 1234
assert add_call["track_data"]["explicit"] is True
def test_set_wishlist_cycle_rejects_invalid_cycle():

@ -123,7 +123,24 @@ def test_get_wishlist_tracks_for_download_formats_modal_shape():
assert formatted_tracks == [
{
"wishlist_id": "wl-1",
"track_id": "sp-1",
"spotify_track_id": "sp-1",
"track_data": {
"id": "sp-1",
"name": "Song One",
"artists": [{"name": "Artist One"}],
"album": {"name": "Album One"},
"duration_ms": 321,
"preview_url": "https://example.test/preview",
"external_urls": {"spotify": "https://open.spotify.com/track/sp-1"},
"popularity": 88,
"track_number": 7,
"disc_number": 2,
},
"track_name": "Song One",
"artist_name": "Artist One",
"album_name": "Album One",
"provider": None,
"spotify_data": {
"id": "sp-1",
"name": "Song One",

Loading…
Cancel
Save