mirror of https://github.com/Nezreka/SoulSync.git
- extract the remaining wishlist endpoint behavior from web_server.py into core/wishlist/routes.py - keep web_server.py as a thin Flask adapter around the new route helpers - add tests that cover wishlist counts, stats, track listing, clear/remove flows, cycle updates, and album-track addspull/400/head
parent
f32fc9d56e
commit
d2af9f8bdf
@ -0,0 +1,440 @@
|
||||
"""Wishlist controller helpers for Flask-style endpoints."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import re
|
||||
import threading
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Callable, Dict
|
||||
|
||||
from core.wishlist.reporting import build_wishlist_stats_payload
|
||||
from core.wishlist.selection import prepare_wishlist_tracks_for_display
|
||||
from core.wishlist.state import get_wishlist_cycle as _get_wishlist_cycle
|
||||
from core.wishlist.state import set_wishlist_cycle as _set_wishlist_cycle
|
||||
|
||||
|
||||
@dataclass
|
||||
class WishlistRouteRuntime:
|
||||
"""Dependencies needed to service wishlist HTTP endpoints outside the controller."""
|
||||
|
||||
get_wishlist_service: Callable[[], Any]
|
||||
get_music_database: Callable[[], Any]
|
||||
get_current_profile_id: Callable[[], int]
|
||||
download_batches: Dict[str, Dict[str, Any]]
|
||||
download_tasks: Dict[str, Dict[str, Any]]
|
||||
tasks_lock: Any
|
||||
is_wishlist_auto_processing_flag: Callable[[], bool]
|
||||
is_wishlist_actually_processing: Callable[[], bool]
|
||||
reset_wishlist_processing_state: Callable[[], None]
|
||||
add_activity_item: Callable[[Any, Any, Any, Any], Any]
|
||||
logger: Any
|
||||
active_server: str
|
||||
get_next_run_seconds: Callable[[str], int] | None = None
|
||||
thread_factory: Callable[..., Any] = threading.Thread
|
||||
|
||||
|
||||
def _build_album_images(album: Dict[str, Any]) -> list[dict[str, Any]]:
|
||||
if isinstance(album.get("images"), list) and album.get("images"):
|
||||
return list(album["images"])
|
||||
if album.get("image_url"):
|
||||
return [{"url": album["image_url"], "height": 640, "width": 640}]
|
||||
return []
|
||||
|
||||
|
||||
def _build_spotify_track_data(track: Dict[str, Any], album: Dict[str, Any]) -> Dict[str, Any]:
|
||||
album_images = _build_album_images(album)
|
||||
return {
|
||||
"id": track.get("id"),
|
||||
"name": track.get("name"),
|
||||
"artists": track.get("artists", []),
|
||||
"album": {
|
||||
"id": album.get("id"),
|
||||
"name": album.get("name"),
|
||||
"artists": album.get("artists", []),
|
||||
"images": album_images,
|
||||
"album_type": album.get("album_type", "album"),
|
||||
"release_date": album.get("release_date", ""),
|
||||
"total_tracks": album.get("total_tracks", 1),
|
||||
},
|
||||
"duration_ms": track.get("duration_ms", 0),
|
||||
"track_number": track.get("track_number", 1),
|
||||
"disc_number": track.get("disc_number", 1),
|
||||
"explicit": track.get("explicit", False),
|
||||
"popularity": track.get("popularity", 0),
|
||||
"preview_url": track.get("preview_url"),
|
||||
"external_urls": track.get("external_urls", {}),
|
||||
}
|
||||
|
||||
|
||||
def _load_track_spotify_data(track: Dict[str, Any]) -> Dict[str, Any]:
|
||||
spotify_data = track.get("spotify_data", {})
|
||||
if isinstance(spotify_data, str):
|
||||
try:
|
||||
spotify_data = json.loads(spotify_data)
|
||||
except Exception:
|
||||
spotify_data = {}
|
||||
if not isinstance(spotify_data, dict):
|
||||
spotify_data = {}
|
||||
return spotify_data
|
||||
|
||||
|
||||
def _album_lookup_id(spotify_data: Dict[str, Any]) -> tuple[str | None, Dict[str, Any]]:
|
||||
album_data = spotify_data.get("album") or {}
|
||||
if not isinstance(album_data, dict):
|
||||
album_data = {}
|
||||
|
||||
track_album_id = album_data.get("id")
|
||||
if not track_album_id:
|
||||
album_name = album_data.get("name", "Unknown Album")
|
||||
artists = spotify_data.get("artists", [])
|
||||
if isinstance(artists, list) and artists and isinstance(artists[0], dict):
|
||||
artist_name = artists[0].get("name", "Unknown Artist")
|
||||
elif isinstance(artists, list) and artists and isinstance(artists[0], str):
|
||||
artist_name = artists[0]
|
||||
else:
|
||||
artist_name = "Unknown Artist"
|
||||
custom_id = f"{album_name}_{artist_name}"
|
||||
track_album_id = re.sub(r"[^a-zA-Z0-9\s_-]", "", custom_id)
|
||||
track_album_id = re.sub(r"\s+", "_", track_album_id).lower()
|
||||
|
||||
return track_album_id, album_data
|
||||
|
||||
|
||||
def process_wishlist_api(
|
||||
runtime: WishlistRouteRuntime,
|
||||
*,
|
||||
start_processing: Callable[[], None],
|
||||
) -> tuple[Dict[str, Any], int]:
|
||||
"""Trigger wishlist processing in the background."""
|
||||
try:
|
||||
if runtime.is_wishlist_auto_processing_flag():
|
||||
return {"success": False, "error": "Wishlist processing already in progress"}, 409
|
||||
|
||||
thread = runtime.thread_factory(target=start_processing, daemon=True)
|
||||
thread.start()
|
||||
return {"success": True, "message": "Wishlist processing started"}, 200
|
||||
except Exception as exc:
|
||||
runtime.logger.error("Error starting wishlist processing: %s", exc)
|
||||
return {"success": False, "error": str(exc)}, 500
|
||||
|
||||
|
||||
def get_wishlist_count(runtime: WishlistRouteRuntime) -> tuple[Dict[str, Any], int]:
|
||||
"""Return the current wishlist count for the active profile."""
|
||||
try:
|
||||
wishlist_service = runtime.get_wishlist_service()
|
||||
count = wishlist_service.get_wishlist_count(profile_id=runtime.get_current_profile_id())
|
||||
return {"count": count}, 200
|
||||
except Exception as exc:
|
||||
runtime.logger.error("Error getting wishlist count: %s", exc)
|
||||
return {"error": str(exc)}, 500
|
||||
|
||||
|
||||
def get_wishlist_stats(runtime: WishlistRouteRuntime) -> tuple[Dict[str, Any], int]:
|
||||
"""Return wishlist statistics for the UI."""
|
||||
try:
|
||||
wishlist_service = runtime.get_wishlist_service()
|
||||
raw_tracks = wishlist_service.get_wishlist_tracks_for_download(profile_id=runtime.get_current_profile_id())
|
||||
next_run_in_seconds = runtime.get_next_run_seconds("process_wishlist") if runtime.get_next_run_seconds else 0
|
||||
is_processing = runtime.is_wishlist_actually_processing()
|
||||
current_cycle = _get_wishlist_cycle(runtime.get_music_database)
|
||||
|
||||
payload = build_wishlist_stats_payload(
|
||||
raw_tracks,
|
||||
next_run_in_seconds=next_run_in_seconds,
|
||||
is_auto_processing=is_processing,
|
||||
current_cycle=current_cycle,
|
||||
)
|
||||
return payload, 200
|
||||
except Exception as exc:
|
||||
runtime.logger.error("Error getting wishlist stats: %s", exc)
|
||||
return {"error": str(exc)}, 500
|
||||
|
||||
|
||||
def get_wishlist_cycle(runtime: WishlistRouteRuntime) -> tuple[Dict[str, Any], int]:
|
||||
"""Return the current wishlist cycle."""
|
||||
try:
|
||||
cycle = _get_wishlist_cycle(runtime.get_music_database)
|
||||
return {"cycle": cycle}, 200
|
||||
except Exception as exc:
|
||||
runtime.logger.error("Error getting wishlist cycle: %s", exc)
|
||||
return {"error": str(exc)}, 500
|
||||
|
||||
|
||||
def set_wishlist_cycle(runtime: WishlistRouteRuntime, cycle: str) -> tuple[Dict[str, Any], int]:
|
||||
"""Persist the wishlist cycle."""
|
||||
try:
|
||||
if cycle not in ["albums", "singles"]:
|
||||
return {"error": "Invalid cycle. Must be 'albums' or 'singles'"}, 400
|
||||
|
||||
_set_wishlist_cycle(runtime.get_music_database, cycle)
|
||||
runtime.logger.info("Wishlist cycle set to: %s", cycle)
|
||||
return {"success": True, "cycle": cycle}, 200
|
||||
except Exception as exc:
|
||||
runtime.logger.error("Error setting wishlist cycle: %s", exc)
|
||||
return {"error": str(exc)}, 500
|
||||
|
||||
|
||||
def get_wishlist_tracks(
|
||||
runtime: WishlistRouteRuntime,
|
||||
*,
|
||||
category: str | None = None,
|
||||
limit: int | None = None,
|
||||
) -> tuple[Dict[str, Any], int]:
|
||||
"""Return wishlist tracks for the modal UI."""
|
||||
try:
|
||||
wishlist_service = runtime.get_wishlist_service()
|
||||
db = runtime.get_music_database()
|
||||
|
||||
with runtime.tasks_lock:
|
||||
wishlist_batch_active = any(
|
||||
batch.get("playlist_id") == "wishlist" and batch.get("phase") in ["analysis", "downloading"]
|
||||
for batch in runtime.download_batches.values()
|
||||
)
|
||||
|
||||
if not wishlist_batch_active:
|
||||
duplicates_removed = db.remove_wishlist_duplicates(profile_id=runtime.get_current_profile_id())
|
||||
if duplicates_removed > 0:
|
||||
runtime.logger.warning("Cleaned %s duplicate tracks from wishlist", duplicates_removed)
|
||||
else:
|
||||
runtime.logger.warning("Skipping wishlist duplicate cleanup - download in progress")
|
||||
|
||||
raw_tracks = wishlist_service.get_wishlist_tracks_for_download(profile_id=runtime.get_current_profile_id())
|
||||
prepared = prepare_wishlist_tracks_for_display(raw_tracks, category=category, limit=limit)
|
||||
|
||||
if prepared["duplicates_found"] > 0:
|
||||
runtime.logger.warning(
|
||||
"[API-Wishlist-Tracks] Found and removed %s duplicate tracks during sanitization",
|
||||
prepared["duplicates_found"],
|
||||
)
|
||||
|
||||
if category:
|
||||
runtime.logger.info(
|
||||
"Wishlist filter: %s/%s tracks in '%s' category (limit: %s)",
|
||||
len(prepared["tracks"]),
|
||||
prepared["total"],
|
||||
category,
|
||||
limit or "none",
|
||||
)
|
||||
return {"tracks": prepared["tracks"], "category": category, "total": prepared["total"]}, 200
|
||||
|
||||
return {"tracks": prepared["tracks"], "total": prepared["total"]}, 200
|
||||
except Exception as exc:
|
||||
runtime.logger.error("Error getting wishlist tracks: %s", exc)
|
||||
return {"error": str(exc)}, 500
|
||||
|
||||
|
||||
def clear_wishlist(runtime: WishlistRouteRuntime) -> tuple[Dict[str, Any], int]:
|
||||
"""Clear the wishlist and cancel active wishlist batches."""
|
||||
try:
|
||||
wishlist_service = runtime.get_wishlist_service()
|
||||
success = wishlist_service.clear_wishlist(profile_id=runtime.get_current_profile_id())
|
||||
|
||||
if success:
|
||||
cancelled_count = 0
|
||||
with runtime.tasks_lock:
|
||||
for _batch_id, batch_data in runtime.download_batches.items():
|
||||
if batch_data.get("playlist_id") == "wishlist" and batch_data.get("phase") not in (
|
||||
"complete",
|
||||
"error",
|
||||
"cancelled",
|
||||
):
|
||||
batch_data["phase"] = "cancelled"
|
||||
for task_id in batch_data.get("queue", []):
|
||||
if task_id in runtime.download_tasks and runtime.download_tasks[task_id]["status"] not in (
|
||||
"completed",
|
||||
"failed",
|
||||
"not_found",
|
||||
"cancelled",
|
||||
):
|
||||
runtime.download_tasks[task_id]["status"] = "cancelled"
|
||||
cancelled_count += 1
|
||||
|
||||
runtime.reset_wishlist_processing_state()
|
||||
|
||||
if cancelled_count > 0:
|
||||
runtime.logger.warning("[Wishlist Clear] Cancelled %s active wishlist downloads", cancelled_count)
|
||||
runtime.add_activity_item("", "Wishlist Cleared", f"Wishlist cleared and {cancelled_count} downloads cancelled", "Now")
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"message": "Wishlist cleared successfully",
|
||||
"cancelled_downloads": cancelled_count,
|
||||
}, 200
|
||||
|
||||
return {"success": False, "error": "Failed to clear wishlist"}, 500
|
||||
except Exception as exc:
|
||||
runtime.logger.error("Error clearing wishlist: %s", exc)
|
||||
return {"success": False, "error": str(exc)}, 500
|
||||
|
||||
|
||||
def remove_track_from_wishlist(
|
||||
runtime: WishlistRouteRuntime,
|
||||
spotify_track_id: str | None,
|
||||
) -> tuple[Dict[str, Any], int]:
|
||||
"""Remove a single track from the wishlist."""
|
||||
try:
|
||||
if not spotify_track_id:
|
||||
return {"success": False, "error": "No spotify_track_id provided"}, 400
|
||||
|
||||
wishlist_service = runtime.get_wishlist_service()
|
||||
success = wishlist_service.remove_track_from_wishlist(
|
||||
spotify_track_id,
|
||||
profile_id=runtime.get_current_profile_id(),
|
||||
)
|
||||
|
||||
if success:
|
||||
runtime.logger.info("Successfully removed track from wishlist: %s", spotify_track_id)
|
||||
return {"success": True, "message": "Track removed from wishlist"}, 200
|
||||
|
||||
runtime.logger.warning("Failed to remove track from wishlist: %s", spotify_track_id)
|
||||
return {"success": False, "error": "Track not found in wishlist"}, 404
|
||||
except Exception as exc:
|
||||
runtime.logger.error("Error removing track from wishlist: %s", exc)
|
||||
return {"success": False, "error": str(exc)}, 500
|
||||
|
||||
|
||||
def remove_album_from_wishlist(
|
||||
runtime: WishlistRouteRuntime,
|
||||
*,
|
||||
album_id: str | None = None,
|
||||
album_name_filter: str | None = None,
|
||||
) -> tuple[Dict[str, Any], int]:
|
||||
"""Remove every wishlist track that belongs to the selected album."""
|
||||
try:
|
||||
if not album_id and not album_name_filter:
|
||||
return {"success": False, "error": "No album_id or album_name provided"}, 400
|
||||
|
||||
wishlist_service = runtime.get_wishlist_service()
|
||||
all_tracks = wishlist_service.get_wishlist_tracks_for_download(profile_id=runtime.get_current_profile_id())
|
||||
|
||||
tracks_to_remove = []
|
||||
for track in all_tracks:
|
||||
spotify_data = _load_track_spotify_data(track)
|
||||
track_album_id, album_data = _album_lookup_id(spotify_data)
|
||||
|
||||
matched = False
|
||||
if album_id and track_album_id == album_id:
|
||||
matched = True
|
||||
elif album_name_filter:
|
||||
track_album_name = album_data.get("name", "")
|
||||
if isinstance(spotify_data.get("album"), str):
|
||||
track_album_name = spotify_data["album"]
|
||||
if track_album_name and track_album_name.lower().strip() == album_name_filter.lower().strip():
|
||||
matched = True
|
||||
|
||||
if matched:
|
||||
spotify_track_id = track.get("spotify_track_id") or track.get("id")
|
||||
if spotify_track_id:
|
||||
tracks_to_remove.append(spotify_track_id)
|
||||
|
||||
removed_count = 0
|
||||
album_remove_pid = runtime.get_current_profile_id()
|
||||
for spotify_track_id in tracks_to_remove:
|
||||
if wishlist_service.remove_track_from_wishlist(spotify_track_id, profile_id=album_remove_pid):
|
||||
removed_count += 1
|
||||
|
||||
if removed_count > 0:
|
||||
runtime.logger.info("Successfully removed %s tracks from album %s", removed_count, album_id)
|
||||
return {
|
||||
"success": True,
|
||||
"message": f"Removed {removed_count} track(s) from wishlist",
|
||||
"removed_count": removed_count,
|
||||
}, 200
|
||||
|
||||
runtime.logger.warning("No tracks found for album %s", album_id)
|
||||
return {"success": False, "error": "No tracks found for this album"}, 404
|
||||
except Exception as exc:
|
||||
runtime.logger.error("Error removing album from wishlist: %s", exc)
|
||||
return {"success": False, "error": str(exc)}, 500
|
||||
|
||||
|
||||
def remove_batch_from_wishlist(
|
||||
runtime: WishlistRouteRuntime,
|
||||
spotify_track_ids,
|
||||
) -> tuple[Dict[str, Any], int]:
|
||||
"""Remove a batch of tracks from the wishlist."""
|
||||
try:
|
||||
if not spotify_track_ids or not isinstance(spotify_track_ids, list):
|
||||
return {"success": False, "error": "Missing or invalid spotify_track_ids"}, 400
|
||||
|
||||
wishlist_service = runtime.get_wishlist_service()
|
||||
removed = 0
|
||||
pid = runtime.get_current_profile_id()
|
||||
for track_id in spotify_track_ids:
|
||||
if wishlist_service.remove_track_from_wishlist(track_id, profile_id=pid):
|
||||
removed += 1
|
||||
|
||||
runtime.logger.info("Batch removed %s track(s) from wishlist", removed)
|
||||
return {
|
||||
"success": True,
|
||||
"removed": removed,
|
||||
"message": f"Removed {removed} track{'s' if removed != 1 else ''} from wishlist",
|
||||
}, 200
|
||||
except Exception as exc:
|
||||
runtime.logger.error("Error batch removing from wishlist: %s", exc)
|
||||
return {"success": False, "error": str(exc)}, 500
|
||||
|
||||
|
||||
def add_album_track_to_wishlist(
|
||||
runtime: WishlistRouteRuntime,
|
||||
*,
|
||||
track: Dict[str, Any] | None,
|
||||
artist: Dict[str, Any] | None,
|
||||
album: Dict[str, Any] | None,
|
||||
source_type: str = "album",
|
||||
source_context: Dict[str, Any] | None = None,
|
||||
) -> tuple[Dict[str, Any], int]:
|
||||
"""Add a single album track to the wishlist."""
|
||||
try:
|
||||
if not track or not artist or not album:
|
||||
return {"success": False, "error": "Missing required fields: track, artist, album"}, 400
|
||||
|
||||
spotify_track_data = _build_spotify_track_data(track, album)
|
||||
|
||||
enhanced_source_context = {
|
||||
**(source_context or {}),
|
||||
"artist_id": artist.get("id"),
|
||||
"artist_name": artist.get("name"),
|
||||
"album_id": album.get("id"),
|
||||
"album_name": album.get("name"),
|
||||
"added_via": "library_wishlist_modal",
|
||||
}
|
||||
|
||||
wishlist_service = runtime.get_wishlist_service()
|
||||
success = wishlist_service.add_spotify_track_to_wishlist(
|
||||
spotify_track_data=spotify_track_data,
|
||||
failure_reason="Added from library (incomplete album)",
|
||||
source_type=source_type,
|
||||
source_context=enhanced_source_context,
|
||||
profile_id=runtime.get_current_profile_id(),
|
||||
)
|
||||
|
||||
if success:
|
||||
runtime.logger.info("Added track '%s' by '%s' to wishlist", track.get("name"), artist.get("name"))
|
||||
return {"success": True, "message": f"Added '{track.get('name')}' to wishlist"}, 200
|
||||
|
||||
runtime.logger.error("Failed to add track '%s' to wishlist", track.get("name"))
|
||||
return {"success": False, "error": "Failed to add track to wishlist"}, 200
|
||||
except Exception as exc:
|
||||
runtime.logger.error("Error adding track to wishlist: %s", exc)
|
||||
import traceback
|
||||
|
||||
traceback.print_exc()
|
||||
return {"success": False, "error": str(exc)}, 500
|
||||
|
||||
|
||||
__all__ = [
|
||||
"WishlistRouteRuntime",
|
||||
"process_wishlist_api",
|
||||
"get_wishlist_count",
|
||||
"get_wishlist_stats",
|
||||
"get_wishlist_cycle",
|
||||
"set_wishlist_cycle",
|
||||
"get_wishlist_tracks",
|
||||
"clear_wishlist",
|
||||
"remove_track_from_wishlist",
|
||||
"remove_album_from_wishlist",
|
||||
"remove_batch_from_wishlist",
|
||||
"add_album_track_to_wishlist",
|
||||
]
|
||||
@ -0,0 +1,487 @@
|
||||
import json
|
||||
|
||||
from core.wishlist.routes import (
|
||||
WishlistRouteRuntime,
|
||||
add_album_track_to_wishlist,
|
||||
clear_wishlist,
|
||||
get_wishlist_count,
|
||||
get_wishlist_cycle,
|
||||
get_wishlist_stats,
|
||||
get_wishlist_tracks,
|
||||
process_wishlist_api,
|
||||
remove_album_from_wishlist,
|
||||
remove_batch_from_wishlist,
|
||||
remove_track_from_wishlist,
|
||||
set_wishlist_cycle,
|
||||
)
|
||||
|
||||
|
||||
class _FakeLogger:
|
||||
def __init__(self):
|
||||
self.info_messages = []
|
||||
self.warning_messages = []
|
||||
self.error_messages = []
|
||||
self.debug_messages = []
|
||||
|
||||
def info(self, msg, *args):
|
||||
self.info_messages.append(msg % args if args else msg)
|
||||
|
||||
def warning(self, msg, *args):
|
||||
self.warning_messages.append(msg % args if args else msg)
|
||||
|
||||
def error(self, msg, *args):
|
||||
self.error_messages.append(msg % args if args else msg)
|
||||
|
||||
def debug(self, msg, *args):
|
||||
self.debug_messages.append(msg % args if args else msg)
|
||||
|
||||
|
||||
class _FakeThread:
|
||||
def __init__(self, target=None, daemon=False):
|
||||
self.target = target
|
||||
self.daemon = daemon
|
||||
self.started = False
|
||||
|
||||
def start(self):
|
||||
self.started = True
|
||||
if self.target:
|
||||
self.target()
|
||||
|
||||
|
||||
class _FakeThreadFactory:
|
||||
def __init__(self):
|
||||
self.created = []
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
thread = _FakeThread(*args, **kwargs)
|
||||
self.created.append(thread)
|
||||
return thread
|
||||
|
||||
|
||||
class _FakeLock:
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
|
||||
class _FakeCursor:
|
||||
def __init__(self, db):
|
||||
self.db = db
|
||||
self.last_sql = ""
|
||||
|
||||
def execute(self, sql, params=None):
|
||||
self.last_sql = sql
|
||||
if "INSERT OR REPLACE INTO metadata" in sql and params:
|
||||
self.db.cycle_value = params[0]
|
||||
|
||||
def fetchone(self):
|
||||
if "SELECT value FROM metadata WHERE key = 'wishlist_cycle'" in self.last_sql:
|
||||
return {"value": self.db.cycle_value}
|
||||
return None
|
||||
|
||||
|
||||
class _FakeConnection:
|
||||
def __init__(self, db):
|
||||
self.db = db
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
def cursor(self):
|
||||
return self.db.cursor_obj
|
||||
|
||||
def commit(self):
|
||||
self.db.commits += 1
|
||||
|
||||
|
||||
class _FakeMusicDatabase:
|
||||
def __init__(self, cycle_value="albums", duplicate_removals=0):
|
||||
self.cycle_value = cycle_value
|
||||
self.duplicate_removals = duplicate_removals
|
||||
self.commits = 0
|
||||
self.cursor_obj = _FakeCursor(self)
|
||||
self.duplicate_cleanup_profiles = []
|
||||
|
||||
def _get_connection(self):
|
||||
return _FakeConnection(self)
|
||||
|
||||
def remove_wishlist_duplicates(self, profile_id=1):
|
||||
self.duplicate_cleanup_profiles.append(profile_id)
|
||||
return self.duplicate_removals
|
||||
|
||||
|
||||
class _FakeWishlistService:
|
||||
def __init__(self, tracks=None, count=None, clear_result=True):
|
||||
self.tracks = list(tracks or [])
|
||||
self.count = len(self.tracks) if count is None else count
|
||||
self.clear_result = clear_result
|
||||
self.removed = []
|
||||
self.add_calls = []
|
||||
|
||||
def get_wishlist_count(self, profile_id=1):
|
||||
return self.count
|
||||
|
||||
def get_wishlist_tracks_for_download(self, profile_id=1):
|
||||
return list(self.tracks)
|
||||
|
||||
def clear_wishlist(self, profile_id=1):
|
||||
return self.clear_result
|
||||
|
||||
def remove_track_from_wishlist(self, spotify_track_id, profile_id=1):
|
||||
self.removed.append((spotify_track_id, profile_id))
|
||||
return True
|
||||
|
||||
def add_spotify_track_to_wishlist(self, **kwargs):
|
||||
self.add_calls.append(kwargs)
|
||||
return True
|
||||
|
||||
|
||||
def _build_runtime(
|
||||
*,
|
||||
tracks=None,
|
||||
count=None,
|
||||
cycle_value="albums",
|
||||
duplicate_removals=0,
|
||||
clear_result=True,
|
||||
auto_processing_flag=False,
|
||||
actually_processing=False,
|
||||
next_run_seconds=0,
|
||||
download_batches=None,
|
||||
download_tasks=None,
|
||||
thread_factory=None,
|
||||
reset_callback=None,
|
||||
):
|
||||
service = _FakeWishlistService(tracks=tracks, count=count, clear_result=clear_result)
|
||||
db = _FakeMusicDatabase(cycle_value=cycle_value, duplicate_removals=duplicate_removals)
|
||||
logger = _FakeLogger()
|
||||
activity_calls = []
|
||||
runtime = WishlistRouteRuntime(
|
||||
get_wishlist_service=lambda: service,
|
||||
get_music_database=lambda: db,
|
||||
get_current_profile_id=lambda: 1,
|
||||
download_batches=download_batches if download_batches is not None else {},
|
||||
download_tasks=download_tasks if download_tasks is not None else {},
|
||||
tasks_lock=_FakeLock(),
|
||||
is_wishlist_auto_processing_flag=lambda: auto_processing_flag,
|
||||
is_wishlist_actually_processing=lambda: actually_processing,
|
||||
reset_wishlist_processing_state=reset_callback or (lambda: None),
|
||||
add_activity_item=lambda *args: activity_calls.append(args),
|
||||
logger=logger,
|
||||
active_server="navidrome",
|
||||
get_next_run_seconds=(lambda _name: next_run_seconds),
|
||||
thread_factory=thread_factory or _FakeThreadFactory(),
|
||||
)
|
||||
return runtime, service, db, logger, activity_calls
|
||||
|
||||
|
||||
def test_process_wishlist_api_starts_background_thread_when_idle():
|
||||
thread_factory = _FakeThreadFactory()
|
||||
runtime, _service, _db, logger, _activity_calls = _build_runtime(
|
||||
thread_factory=thread_factory,
|
||||
)
|
||||
start_calls = []
|
||||
|
||||
payload, status = process_wishlist_api(runtime, start_processing=lambda: start_calls.append("ran"))
|
||||
|
||||
assert status == 200
|
||||
assert payload == {"success": True, "message": "Wishlist processing started"}
|
||||
assert start_calls == ["ran"]
|
||||
assert len(thread_factory.created) == 1
|
||||
assert thread_factory.created[0].daemon is True
|
||||
assert thread_factory.created[0].started is True
|
||||
assert logger.error_messages == []
|
||||
|
||||
|
||||
def test_process_wishlist_api_rejects_when_flag_is_set():
|
||||
thread_factory = _FakeThreadFactory()
|
||||
runtime, _service, _db, logger, _activity_calls = _build_runtime(
|
||||
auto_processing_flag=True,
|
||||
thread_factory=thread_factory,
|
||||
)
|
||||
|
||||
payload, status = process_wishlist_api(runtime, start_processing=lambda: None)
|
||||
|
||||
assert status == 409
|
||||
assert payload == {"success": False, "error": "Wishlist processing already in progress"}
|
||||
assert thread_factory.created == []
|
||||
assert logger.error_messages == []
|
||||
|
||||
|
||||
def test_get_wishlist_count_returns_profile_count():
|
||||
runtime, _service, _db, _logger, _activity_calls = _build_runtime(count=7)
|
||||
|
||||
payload, status = get_wishlist_count(runtime)
|
||||
|
||||
assert status == 200
|
||||
assert payload == {"count": 7}
|
||||
|
||||
|
||||
def test_get_wishlist_stats_uses_cycle_and_next_run():
|
||||
tracks = [
|
||||
{
|
||||
"id": "track-1",
|
||||
"name": "Single Song",
|
||||
"artists": [{"name": "Artist One"}],
|
||||
"spotify_data": {"album": {"album_type": "single"}},
|
||||
},
|
||||
{
|
||||
"id": "track-2",
|
||||
"name": "Album Song",
|
||||
"artists": [{"name": "Artist Two"}],
|
||||
"spotify_data": {"album": {"total_tracks": 8}},
|
||||
},
|
||||
]
|
||||
runtime, _service, _db, _logger, _activity_calls = _build_runtime(
|
||||
tracks=tracks,
|
||||
count=2,
|
||||
cycle_value="albums",
|
||||
actually_processing=True,
|
||||
next_run_seconds=123,
|
||||
)
|
||||
|
||||
payload, status = get_wishlist_stats(runtime)
|
||||
|
||||
assert status == 200
|
||||
assert payload == {
|
||||
"singles": 1,
|
||||
"albums": 1,
|
||||
"total": 2,
|
||||
"next_run_in_seconds": 123,
|
||||
"is_auto_processing": True,
|
||||
"current_cycle": "albums",
|
||||
}
|
||||
|
||||
|
||||
def test_get_wishlist_tracks_filters_category_and_cleans_duplicates():
|
||||
tracks = [
|
||||
{
|
||||
"id": "track-1",
|
||||
"name": "Single One",
|
||||
"artists": [{"name": "Artist One"}],
|
||||
"spotify_data": {"album": {"album_type": "single"}},
|
||||
},
|
||||
{
|
||||
"id": "track-2",
|
||||
"name": "Single Two",
|
||||
"artists": [{"name": "Artist Two"}],
|
||||
"spotify_data": {"album": {"album_type": "single"}},
|
||||
},
|
||||
{
|
||||
"id": "track-3",
|
||||
"name": "Album Song",
|
||||
"artists": [{"name": "Artist Three"}],
|
||||
"spotify_data": {"album": {"total_tracks": 8}},
|
||||
},
|
||||
]
|
||||
runtime, service, db, logger, _activity_calls = _build_runtime(
|
||||
tracks=tracks,
|
||||
duplicate_removals=2,
|
||||
)
|
||||
|
||||
payload, status = get_wishlist_tracks(runtime, category="singles", limit=1)
|
||||
|
||||
assert status == 200
|
||||
assert payload["category"] == "singles"
|
||||
assert payload["total"] == 2
|
||||
assert len(payload["tracks"]) == 1
|
||||
assert payload["tracks"][0]["id"] == "track-1"
|
||||
assert db.duplicate_cleanup_profiles == [1]
|
||||
assert any("duplicate tracks from wishlist" in msg for msg in logger.warning_messages)
|
||||
assert service.get_wishlist_tracks_for_download(profile_id=1)[0]["id"] == "track-1"
|
||||
|
||||
|
||||
def test_clear_wishlist_cancels_active_batches_and_resets_state():
|
||||
download_batches = {
|
||||
"batch-1": {
|
||||
"playlist_id": "wishlist",
|
||||
"phase": "analysis",
|
||||
"queue": ["task-1", "task-2", "task-3"],
|
||||
},
|
||||
"batch-2": {
|
||||
"playlist_id": "other",
|
||||
"phase": "analysis",
|
||||
"queue": ["task-4"],
|
||||
},
|
||||
}
|
||||
download_tasks = {
|
||||
"task-1": {"status": "queued"},
|
||||
"task-2": {"status": "in_progress"},
|
||||
"task-3": {"status": "completed"},
|
||||
"task-4": {"status": "queued"},
|
||||
}
|
||||
reset_calls = []
|
||||
runtime, service, _db, logger, activity_calls = _build_runtime(
|
||||
download_batches=download_batches,
|
||||
download_tasks=download_tasks,
|
||||
reset_callback=lambda: reset_calls.append("reset"),
|
||||
)
|
||||
|
||||
payload, status = clear_wishlist(runtime)
|
||||
|
||||
assert status == 200
|
||||
assert payload == {
|
||||
"success": True,
|
||||
"message": "Wishlist cleared successfully",
|
||||
"cancelled_downloads": 2,
|
||||
}
|
||||
assert service.clear_result is True
|
||||
assert download_batches["batch-1"]["phase"] == "cancelled"
|
||||
assert download_batches["batch-2"]["phase"] == "analysis"
|
||||
assert download_tasks["task-1"]["status"] == "cancelled"
|
||||
assert download_tasks["task-2"]["status"] == "cancelled"
|
||||
assert download_tasks["task-3"]["status"] == "completed"
|
||||
assert download_tasks["task-4"]["status"] == "queued"
|
||||
assert reset_calls == ["reset"]
|
||||
assert activity_calls == [
|
||||
("", "Wishlist Cleared", "Wishlist cleared and 2 downloads cancelled", "Now")
|
||||
]
|
||||
assert any("Cancelled 2 active wishlist downloads" in msg for msg in logger.warning_messages)
|
||||
|
||||
|
||||
def test_remove_track_from_wishlist_requires_track_id():
|
||||
runtime, _service, _db, _logger, _activity_calls = _build_runtime()
|
||||
|
||||
payload, status = remove_track_from_wishlist(runtime, None)
|
||||
|
||||
assert status == 400
|
||||
assert payload == {"success": False, "error": "No spotify_track_id provided"}
|
||||
|
||||
|
||||
def test_remove_track_from_wishlist_removes_single_track():
|
||||
runtime, service, _db, _logger, _activity_calls = _build_runtime()
|
||||
|
||||
payload, status = remove_track_from_wishlist(runtime, "track-1")
|
||||
|
||||
assert status == 200
|
||||
assert payload == {"success": True, "message": "Track removed from wishlist"}
|
||||
assert service.removed == [("track-1", 1)]
|
||||
|
||||
|
||||
def test_remove_album_from_wishlist_matches_album_name():
|
||||
tracks = [
|
||||
{
|
||||
"wishlist_id": 1,
|
||||
"spotify_track_id": "track-1",
|
||||
"id": "track-1",
|
||||
"spotify_data": json.dumps(
|
||||
{
|
||||
"album": {"name": "Complete Album"},
|
||||
"artists": [{"name": "Artist One"}],
|
||||
}
|
||||
),
|
||||
},
|
||||
{
|
||||
"wishlist_id": 2,
|
||||
"spotify_track_id": "track-2",
|
||||
"id": "track-2",
|
||||
"spotify_data": {
|
||||
"album": {"name": "Other Album"},
|
||||
"artists": [{"name": "Artist Two"}],
|
||||
},
|
||||
},
|
||||
]
|
||||
runtime, service, _db, _logger, _activity_calls = _build_runtime(tracks=tracks)
|
||||
|
||||
payload, status = remove_album_from_wishlist(runtime, album_name_filter="complete album")
|
||||
|
||||
assert status == 200
|
||||
assert payload == {
|
||||
"success": True,
|
||||
"message": "Removed 1 track(s) from wishlist",
|
||||
"removed_count": 1,
|
||||
}
|
||||
assert service.removed == [("track-1", 1)]
|
||||
|
||||
|
||||
def test_remove_batch_from_wishlist_returns_removed_count():
|
||||
runtime, service, _db, _logger, _activity_calls = _build_runtime()
|
||||
|
||||
payload, status = remove_batch_from_wishlist(runtime, ["track-1", "track-2"])
|
||||
|
||||
assert status == 200
|
||||
assert payload == {
|
||||
"success": True,
|
||||
"removed": 2,
|
||||
"message": "Removed 2 tracks from wishlist",
|
||||
}
|
||||
assert service.removed == [("track-1", 1), ("track-2", 1)]
|
||||
|
||||
|
||||
def test_set_wishlist_cycle_updates_metadata():
|
||||
runtime, _service, db, _logger, _activity_calls = _build_runtime(cycle_value="albums")
|
||||
|
||||
payload, status = set_wishlist_cycle(runtime, "singles")
|
||||
|
||||
assert status == 200
|
||||
assert payload == {"success": True, "cycle": "singles"}
|
||||
assert db.cycle_value == "singles"
|
||||
assert db.commits == 1
|
||||
|
||||
|
||||
def test_get_wishlist_cycle_returns_stored_value():
|
||||
runtime, _service, _db, _logger, _activity_calls = _build_runtime(cycle_value="singles")
|
||||
|
||||
payload, status = get_wishlist_cycle(runtime)
|
||||
|
||||
assert status == 200
|
||||
assert payload == {"cycle": "singles"}
|
||||
|
||||
|
||||
def test_add_album_track_to_wishlist_builds_spotify_payload_and_merges_context():
|
||||
runtime, service, _db, _logger, _activity_calls = _build_runtime()
|
||||
track = {
|
||||
"id": "track-1",
|
||||
"name": "Song One",
|
||||
"artists": [{"name": "Artist One"}],
|
||||
"duration_ms": 1234,
|
||||
"track_number": 2,
|
||||
"disc_number": 1,
|
||||
"explicit": True,
|
||||
"popularity": 77,
|
||||
"preview_url": "https://example.test/preview",
|
||||
"external_urls": {"spotify": "https://open.spotify.com/track/track-1"},
|
||||
}
|
||||
artist = {"id": "artist-1", "name": "Artist One"}
|
||||
album = {
|
||||
"id": "album-1",
|
||||
"name": "Album One",
|
||||
"artists": [{"name": "Artist One"}],
|
||||
"image_url": "https://example.test/cover.jpg",
|
||||
"release_date": "2024-01-01",
|
||||
"total_tracks": 10,
|
||||
}
|
||||
|
||||
payload, status = add_album_track_to_wishlist(
|
||||
runtime,
|
||||
track=track,
|
||||
artist=artist,
|
||||
album=album,
|
||||
source_type="album",
|
||||
source_context={"playlist_id": "pl-1"},
|
||||
)
|
||||
|
||||
assert status == 200
|
||||
assert payload == {"success": True, "message": "Added 'Song One' to wishlist"}
|
||||
assert len(service.add_calls) == 1
|
||||
add_call = service.add_calls[0]
|
||||
assert add_call["failure_reason"] == "Added from library (incomplete album)"
|
||||
assert add_call["source_type"] == "album"
|
||||
assert add_call["profile_id"] == 1
|
||||
assert add_call["source_context"] == {
|
||||
"playlist_id": "pl-1",
|
||||
"artist_id": "artist-1",
|
||||
"artist_name": "Artist One",
|
||||
"album_id": "album-1",
|
||||
"album_name": "Album One",
|
||||
"added_via": "library_wishlist_modal",
|
||||
}
|
||||
assert add_call["spotify_track_data"]["album"]["images"] == [
|
||||
{"url": "https://example.test/cover.jpg", "height": 640, "width": 640}
|
||||
]
|
||||
assert add_call["spotify_track_data"]["duration_ms"] == 1234
|
||||
assert add_call["spotify_track_data"]["explicit"] is True
|
||||
Loading…
Reference in new issue