mirror of https://github.com/Nezreka/SoulSync.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
471 lines
19 KiB
471 lines
19 KiB
"""Wishlist controller helpers for Flask-style endpoints."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import re
|
|
import threading
|
|
from dataclasses import dataclass
|
|
from typing import Any, Callable, Dict
|
|
|
|
from core.wishlist.reporting import build_wishlist_stats_payload
|
|
from core.wishlist.selection import prepare_wishlist_tracks_for_display
|
|
from core.wishlist.service import get_wishlist_service
|
|
from core.wishlist.state import get_wishlist_cycle as _get_wishlist_cycle
|
|
from core.wishlist.state import set_wishlist_cycle as _set_wishlist_cycle
|
|
from utils.logging_config import get_logger
|
|
|
|
|
|
module_logger = get_logger("wishlist.routes")
|
|
logger = module_logger
|
|
|
|
|
|
@dataclass
|
|
class WishlistRouteRuntime:
|
|
"""Dependencies needed to service wishlist HTTP endpoints outside the controller."""
|
|
|
|
get_music_database: Callable[[], Any]
|
|
profile_id: int
|
|
download_batches: Dict[str, Dict[str, Any]]
|
|
download_tasks: Dict[str, Dict[str, Any]]
|
|
tasks_lock: Any
|
|
is_wishlist_actually_processing: Callable[[], bool]
|
|
reset_wishlist_processing_state: Callable[[], None]
|
|
add_activity_item: Callable[[Any, Any, Any, Any], Any]
|
|
active_server: str
|
|
logger: Any = module_logger
|
|
get_next_run_seconds: Callable[[str], int] | None = None
|
|
thread_factory: Callable[..., Any] = threading.Thread
|
|
|
|
|
|
def _build_album_images(album: Dict[str, Any]) -> list[dict[str, Any]]:
|
|
if isinstance(album.get("images"), list) and album.get("images"):
|
|
return list(album["images"])
|
|
if album.get("image_url"):
|
|
return [{"url": album["image_url"], "height": 640, "width": 640}]
|
|
return []
|
|
|
|
|
|
def _build_track_data(track: Dict[str, Any], album: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Project a wishlist-modal add payload into the canonical
|
|
wishlist track shape.
|
|
|
|
Pre-fix this used default values (``track_number=1``,
|
|
``disc_number=1``, ``total_tracks=1``, ``release_date=''``) when
|
|
the upstream UI omitted a field. That silently poisoned every
|
|
wishlist row added from the library "add to wishlist" modal:
|
|
track_number locked to 1 regardless of source position, year
|
|
dropped from folder paths because release_date was empty. The
|
|
library modal flow is what's used for "add this album's missing
|
|
tracks to wishlist" and "add this playlist to wishlist" bulk
|
|
actions — the most common user path, so the regression was
|
|
everywhere.
|
|
|
|
Now preserve missing values explicitly (None for numeric
|
|
positions, omit-or-empty for release_date) so the downstream
|
|
import pipeline can detect-and-recover via
|
|
``core/imports/track_number.py:resolve_track_number`` instead
|
|
of locking to 1.
|
|
"""
|
|
album_images = _build_album_images(album)
|
|
return {
|
|
"id": track.get("id"),
|
|
"name": track.get("name"),
|
|
"artists": track.get("artists", []),
|
|
"album": {
|
|
"id": album.get("id"),
|
|
"name": album.get("name"),
|
|
"artists": album.get("artists", []),
|
|
"images": album_images,
|
|
"album_type": album.get("album_type", "album"),
|
|
# release_date stays as whatever the upstream sent
|
|
# (including '' when truly unknown). Path template
|
|
# gracefully omits the year when empty; we don't fake
|
|
# a date.
|
|
"release_date": album.get("release_date", ""),
|
|
# total_tracks=None preserves "we don't know"; UI uses
|
|
# this for category classification + path math. Pre-fix
|
|
# default of 1 mislabelled multi-track albums as singles.
|
|
"total_tracks": album.get("total_tracks"),
|
|
},
|
|
"duration_ms": track.get("duration_ms", 0),
|
|
# Numeric positions: None when missing, not 1.
|
|
"track_number": track.get("track_number"),
|
|
"disc_number": track.get("disc_number"),
|
|
"explicit": track.get("explicit", False),
|
|
"popularity": track.get("popularity", 0),
|
|
"preview_url": track.get("preview_url"),
|
|
"external_urls": track.get("external_urls", {}),
|
|
}
|
|
|
|
|
|
def _build_spotify_track_data(track: Dict[str, Any], album: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Backward-compatible wrapper for `_build_track_data`."""
|
|
return _build_track_data(track, album)
|
|
|
|
|
|
def _load_track_spotify_data(track: Dict[str, Any]) -> Dict[str, Any]:
|
|
spotify_data = track.get("spotify_data", {})
|
|
if isinstance(spotify_data, str):
|
|
try:
|
|
spotify_data = json.loads(spotify_data)
|
|
except Exception:
|
|
spotify_data = {}
|
|
if not isinstance(spotify_data, dict):
|
|
spotify_data = {}
|
|
return spotify_data
|
|
|
|
|
|
def _album_lookup_id(spotify_data: Dict[str, Any]) -> tuple[str | None, Dict[str, Any]]:
|
|
album_data = spotify_data.get("album") or {}
|
|
if not isinstance(album_data, dict):
|
|
album_data = {}
|
|
|
|
track_album_id = album_data.get("id")
|
|
if not track_album_id:
|
|
album_name = album_data.get("name", "Unknown Album")
|
|
artists = spotify_data.get("artists", [])
|
|
if isinstance(artists, list) and artists and isinstance(artists[0], dict):
|
|
artist_name = artists[0].get("name", "Unknown Artist")
|
|
elif isinstance(artists, list) and artists and isinstance(artists[0], str):
|
|
artist_name = artists[0]
|
|
else:
|
|
artist_name = "Unknown Artist"
|
|
custom_id = f"{album_name}_{artist_name}"
|
|
track_album_id = re.sub(r"[^a-zA-Z0-9\s_-]", "", custom_id)
|
|
track_album_id = re.sub(r"\s+", "_", track_album_id).lower()
|
|
|
|
return track_album_id, album_data
|
|
|
|
|
|
def process_wishlist_api(
|
|
runtime: WishlistRouteRuntime,
|
|
*,
|
|
start_processing: Callable[[], None],
|
|
) -> tuple[Dict[str, Any], int]:
|
|
"""Trigger wishlist processing in the background."""
|
|
try:
|
|
if runtime.is_wishlist_actually_processing():
|
|
return {"success": False, "error": "Wishlist processing already in progress"}, 409
|
|
|
|
thread = runtime.thread_factory(target=start_processing, daemon=True)
|
|
thread.start()
|
|
return {"success": True, "message": "Wishlist processing started"}, 200
|
|
except Exception as exc:
|
|
runtime.logger.error("Error starting wishlist processing: %s", exc)
|
|
return {"success": False, "error": str(exc)}, 500
|
|
|
|
|
|
def get_wishlist_count(runtime: WishlistRouteRuntime) -> tuple[Dict[str, Any], int]:
|
|
"""Return the current wishlist count for the active profile."""
|
|
try:
|
|
count = get_wishlist_service().get_wishlist_count(profile_id=runtime.profile_id)
|
|
return {"count": count}, 200
|
|
except Exception as exc:
|
|
runtime.logger.error("Error getting wishlist count: %s", exc)
|
|
return {"error": str(exc)}, 500
|
|
|
|
|
|
def get_wishlist_stats(runtime: WishlistRouteRuntime) -> tuple[Dict[str, Any], int]:
|
|
"""Return wishlist statistics for the UI."""
|
|
try:
|
|
raw_tracks = get_wishlist_service().get_wishlist_tracks_for_download(profile_id=runtime.profile_id)
|
|
next_run_in_seconds = runtime.get_next_run_seconds("process_wishlist") if runtime.get_next_run_seconds else 0
|
|
is_processing = runtime.is_wishlist_actually_processing()
|
|
current_cycle = _get_wishlist_cycle(runtime.get_music_database)
|
|
|
|
payload = build_wishlist_stats_payload(
|
|
raw_tracks,
|
|
next_run_in_seconds=next_run_in_seconds,
|
|
is_auto_processing=is_processing,
|
|
current_cycle=current_cycle,
|
|
)
|
|
return payload, 200
|
|
except Exception as exc:
|
|
runtime.logger.error("Error getting wishlist stats: %s", exc)
|
|
return {"error": str(exc)}, 500
|
|
|
|
|
|
def get_wishlist_cycle(runtime: WishlistRouteRuntime) -> tuple[Dict[str, Any], int]:
|
|
"""Return the current wishlist cycle."""
|
|
try:
|
|
cycle = _get_wishlist_cycle(runtime.get_music_database)
|
|
return {"cycle": cycle}, 200
|
|
except Exception as exc:
|
|
runtime.logger.error("Error getting wishlist cycle: %s", exc)
|
|
return {"error": str(exc)}, 500
|
|
|
|
|
|
def set_wishlist_cycle(runtime: WishlistRouteRuntime, cycle: str) -> tuple[Dict[str, Any], int]:
|
|
"""Persist the wishlist cycle."""
|
|
try:
|
|
if cycle not in ["albums", "singles"]:
|
|
return {"error": "Invalid cycle. Must be 'albums' or 'singles'"}, 400
|
|
|
|
_set_wishlist_cycle(runtime.get_music_database, cycle)
|
|
runtime.logger.info("Wishlist cycle set to: %s", cycle)
|
|
return {"success": True, "cycle": cycle}, 200
|
|
except Exception as exc:
|
|
runtime.logger.error("Error setting wishlist cycle: %s", exc)
|
|
return {"error": str(exc)}, 500
|
|
|
|
|
|
def get_wishlist_tracks(
|
|
runtime: WishlistRouteRuntime,
|
|
*,
|
|
category: str | None = None,
|
|
limit: int | None = None,
|
|
) -> tuple[Dict[str, Any], int]:
|
|
"""Return wishlist tracks for the modal UI."""
|
|
try:
|
|
db = runtime.get_music_database()
|
|
|
|
with runtime.tasks_lock:
|
|
wishlist_batch_active = any(
|
|
batch.get("playlist_id") == "wishlist" and batch.get("phase") in ["analysis", "downloading"]
|
|
for batch in runtime.download_batches.values()
|
|
)
|
|
|
|
if not wishlist_batch_active:
|
|
duplicates_removed = db.remove_wishlist_duplicates(profile_id=runtime.profile_id)
|
|
if duplicates_removed > 0:
|
|
runtime.logger.warning("Cleaned %s duplicate tracks from wishlist", duplicates_removed)
|
|
else:
|
|
runtime.logger.warning("Skipping wishlist duplicate cleanup - download in progress")
|
|
|
|
raw_tracks = get_wishlist_service().get_wishlist_tracks_for_download(profile_id=runtime.profile_id)
|
|
prepared = prepare_wishlist_tracks_for_display(raw_tracks, category=category, limit=limit)
|
|
|
|
if prepared["duplicates_found"] > 0:
|
|
runtime.logger.warning(
|
|
"[API-Wishlist-Tracks] Found and removed %s duplicate tracks during sanitization",
|
|
prepared["duplicates_found"],
|
|
)
|
|
|
|
if category:
|
|
runtime.logger.info(
|
|
"Wishlist filter: %s/%s tracks in '%s' category (limit: %s)",
|
|
len(prepared["tracks"]),
|
|
prepared["total"],
|
|
category,
|
|
limit or "none",
|
|
)
|
|
return {"tracks": prepared["tracks"], "category": category, "total": prepared["total"]}, 200
|
|
|
|
return {"tracks": prepared["tracks"], "total": prepared["total"]}, 200
|
|
except Exception as exc:
|
|
runtime.logger.error("Error getting wishlist tracks: %s", exc)
|
|
return {"error": str(exc)}, 500
|
|
|
|
|
|
def clear_wishlist(runtime: WishlistRouteRuntime) -> tuple[Dict[str, Any], int]:
|
|
"""Clear the wishlist and cancel active wishlist batches."""
|
|
try:
|
|
success = get_wishlist_service().clear_wishlist(profile_id=runtime.profile_id)
|
|
|
|
if success:
|
|
cancelled_count = 0
|
|
with runtime.tasks_lock:
|
|
for _batch_id, batch_data in runtime.download_batches.items():
|
|
if batch_data.get("playlist_id") == "wishlist" and batch_data.get("phase") not in (
|
|
"complete",
|
|
"error",
|
|
"cancelled",
|
|
):
|
|
batch_data["phase"] = "cancelled"
|
|
for task_id in batch_data.get("queue", []):
|
|
if task_id in runtime.download_tasks and runtime.download_tasks[task_id]["status"] not in (
|
|
"completed",
|
|
"failed",
|
|
"not_found",
|
|
"cancelled",
|
|
):
|
|
runtime.download_tasks[task_id]["status"] = "cancelled"
|
|
cancelled_count += 1
|
|
|
|
runtime.reset_wishlist_processing_state()
|
|
|
|
if cancelled_count > 0:
|
|
runtime.logger.warning("[Wishlist Clear] Cancelled %s active wishlist downloads", cancelled_count)
|
|
runtime.add_activity_item("", "Wishlist Cleared", f"Wishlist cleared and {cancelled_count} downloads cancelled", "Now")
|
|
|
|
return {
|
|
"success": True,
|
|
"message": "Wishlist cleared successfully",
|
|
"cancelled_downloads": cancelled_count,
|
|
}, 200
|
|
|
|
return {"success": False, "error": "Failed to clear wishlist"}, 500
|
|
except Exception as exc:
|
|
runtime.logger.error("Error clearing wishlist: %s", exc)
|
|
return {"success": False, "error": str(exc)}, 500
|
|
|
|
|
|
def remove_track_from_wishlist(
|
|
runtime: WishlistRouteRuntime,
|
|
spotify_track_id: str | None,
|
|
) -> tuple[Dict[str, Any], int]:
|
|
"""Remove a single track from the wishlist."""
|
|
try:
|
|
if not spotify_track_id:
|
|
return {"success": False, "error": "No spotify_track_id provided"}, 400
|
|
|
|
success = get_wishlist_service().remove_track_from_wishlist(
|
|
spotify_track_id,
|
|
profile_id=runtime.profile_id,
|
|
)
|
|
|
|
if success:
|
|
runtime.logger.info("Successfully removed track from wishlist: %s", spotify_track_id)
|
|
return {"success": True, "message": "Track removed from wishlist"}, 200
|
|
|
|
runtime.logger.warning("Failed to remove track from wishlist: %s", spotify_track_id)
|
|
return {"success": False, "error": "Track not found in wishlist"}, 404
|
|
except Exception as exc:
|
|
runtime.logger.error("Error removing track from wishlist: %s", exc)
|
|
return {"success": False, "error": str(exc)}, 500
|
|
|
|
|
|
def remove_album_from_wishlist(
|
|
runtime: WishlistRouteRuntime,
|
|
*,
|
|
album_id: str | None = None,
|
|
album_name_filter: str | None = None,
|
|
) -> tuple[Dict[str, Any], int]:
|
|
"""Remove every wishlist track that belongs to the selected album."""
|
|
try:
|
|
if not album_id and not album_name_filter:
|
|
return {"success": False, "error": "No album_id or album_name provided"}, 400
|
|
|
|
wishlist_service = get_wishlist_service()
|
|
all_tracks = wishlist_service.get_wishlist_tracks_for_download(profile_id=runtime.profile_id)
|
|
|
|
tracks_to_remove = []
|
|
for track in all_tracks:
|
|
spotify_data = _load_track_spotify_data(track)
|
|
track_album_id, album_data = _album_lookup_id(spotify_data)
|
|
|
|
matched = False
|
|
if album_id and track_album_id == album_id:
|
|
matched = True
|
|
elif album_name_filter:
|
|
track_album_name = album_data.get("name", "")
|
|
if isinstance(spotify_data.get("album"), str):
|
|
track_album_name = spotify_data["album"]
|
|
if track_album_name and track_album_name.lower().strip() == album_name_filter.lower().strip():
|
|
matched = True
|
|
|
|
if matched:
|
|
spotify_track_id = track.get("track_id") or track.get("spotify_track_id") or track.get("id")
|
|
if spotify_track_id:
|
|
tracks_to_remove.append(spotify_track_id)
|
|
|
|
removed_count = 0
|
|
album_remove_pid = runtime.profile_id
|
|
for spotify_track_id in tracks_to_remove:
|
|
if wishlist_service.remove_track_from_wishlist(spotify_track_id, profile_id=album_remove_pid):
|
|
removed_count += 1
|
|
|
|
if removed_count > 0:
|
|
runtime.logger.info("Successfully removed %s tracks from album %s", removed_count, album_id)
|
|
return {
|
|
"success": True,
|
|
"message": f"Removed {removed_count} track(s) from wishlist",
|
|
"removed_count": removed_count,
|
|
}, 200
|
|
|
|
runtime.logger.warning("No tracks found for album %s", album_id)
|
|
return {"success": False, "error": "No tracks found for this album"}, 404
|
|
except Exception as exc:
|
|
runtime.logger.error("Error removing album from wishlist: %s", exc)
|
|
return {"success": False, "error": str(exc)}, 500
|
|
|
|
|
|
def remove_batch_from_wishlist(
|
|
runtime: WishlistRouteRuntime,
|
|
spotify_track_ids,
|
|
) -> tuple[Dict[str, Any], int]:
|
|
"""Remove a batch of tracks from the wishlist."""
|
|
try:
|
|
if not spotify_track_ids or not isinstance(spotify_track_ids, list):
|
|
return {"success": False, "error": "Missing or invalid spotify_track_ids"}, 400
|
|
|
|
removed = 0
|
|
pid = runtime.profile_id
|
|
for track_id in spotify_track_ids:
|
|
if get_wishlist_service().remove_track_from_wishlist(track_id, profile_id=pid):
|
|
removed += 1
|
|
|
|
runtime.logger.info("Batch removed %s track(s) from wishlist", removed)
|
|
return {
|
|
"success": True,
|
|
"removed": removed,
|
|
"message": f"Removed {removed} track{'s' if removed != 1 else ''} from wishlist",
|
|
}, 200
|
|
except Exception as exc:
|
|
runtime.logger.error("Error batch removing from wishlist: %s", exc)
|
|
return {"success": False, "error": str(exc)}, 500
|
|
|
|
|
|
def add_album_track_to_wishlist(
|
|
runtime: WishlistRouteRuntime,
|
|
*,
|
|
track: Dict[str, Any] | None,
|
|
artist: Dict[str, Any] | None,
|
|
album: Dict[str, Any] | None,
|
|
source_type: str = "album",
|
|
source_context: Dict[str, Any] | None = None,
|
|
) -> tuple[Dict[str, Any], int]:
|
|
"""Add a single album track to the wishlist."""
|
|
try:
|
|
if not track or not artist or not album:
|
|
return {"success": False, "error": "Missing required fields: track, artist, album"}, 400
|
|
|
|
track_data = _build_track_data(track, album)
|
|
|
|
enhanced_source_context = {
|
|
**(source_context or {}),
|
|
"artist_id": artist.get("id"),
|
|
"artist_name": artist.get("name"),
|
|
"album_id": album.get("id"),
|
|
"album_name": album.get("name"),
|
|
"added_via": "library_wishlist_modal",
|
|
}
|
|
|
|
success = get_wishlist_service().add_track_to_wishlist(
|
|
track_data=track_data,
|
|
failure_reason="Added from library (incomplete album)",
|
|
source_type=source_type,
|
|
source_context=enhanced_source_context,
|
|
profile_id=runtime.profile_id,
|
|
)
|
|
|
|
if success:
|
|
runtime.logger.info("Added track '%s' by '%s' to wishlist", track.get("name"), artist.get("name"))
|
|
return {"success": True, "message": f"Added '{track.get('name')}' to wishlist"}, 200
|
|
|
|
runtime.logger.error("Failed to add track '%s' to wishlist", track.get("name"))
|
|
return {"success": False, "error": "Failed to add track to wishlist"}, 200
|
|
except Exception as exc:
|
|
runtime.logger.error("Error adding track to wishlist: %s", exc)
|
|
import traceback
|
|
|
|
traceback.print_exc()
|
|
return {"success": False, "error": str(exc)}, 500
|
|
|
|
|
|
__all__ = [
|
|
"WishlistRouteRuntime",
|
|
"process_wishlist_api",
|
|
"get_wishlist_count",
|
|
"get_wishlist_stats",
|
|
"get_wishlist_cycle",
|
|
"set_wishlist_cycle",
|
|
"get_wishlist_tracks",
|
|
"clear_wishlist",
|
|
"remove_track_from_wishlist",
|
|
"remove_album_from_wishlist",
|
|
"remove_batch_from_wishlist",
|
|
"add_album_track_to_wishlist",
|
|
]
|