You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
SoulSync/core/navidrome_client.py

1434 lines
60 KiB

import requests
import hashlib
import secrets
import time
from typing import List, Optional, Dict, Any
from datetime import datetime
from urllib.parse import urlencode
import json
from utils.logging_config import get_logger
from config.settings import config_manager
# Shared dataclasses live in the neutral media_server package — every
# server client used to define a near-identical XTrackInfo /
# XPlaylistInfo. Lifted to one canonical type so consumers (matching
# engine, sync service) get a single import.
from core.media_server.types import TrackInfo, PlaylistInfo
logger = get_logger("navidrome_client")
class NavidromeArtist:
"""Wrapper class to mimic Plex artist object interface"""
def __init__(self, navidrome_data: Dict[str, Any], client: 'NavidromeClient'):
self._data = navidrome_data
self._client = client
self.ratingKey = navidrome_data.get('id', '')
self.title = navidrome_data.get('name', 'Unknown Artist')
self.addedAt = self._parse_date(navidrome_data.get('dateAdded'))
# Create genres property from Navidrome data
self.genres = []
# TODO: Map Navidrome genre data to match Plex format
# Create summary property (used for timestamp storage)
self.summary = navidrome_data.get('biography', '') or ''
# Create thumb property for artist images
self.thumb = self._get_artist_image_url()
def _parse_date(self, date_str: Optional[str]) -> Optional[datetime]:
"""Parse Navidrome date string to datetime"""
if not date_str:
return None
try:
# Navidrome uses ISO format
return datetime.fromisoformat(date_str.replace('Z', '+00:00'))
except:
return None
def _get_artist_image_url(self) -> Optional[str]:
"""Generate Navidrome artist image URL using Subsonic getCoverArt API"""
if not self.ratingKey:
return None
# Subsonic getCoverArt API for artist images
return f"/rest/getCoverArt?id={self.ratingKey}"
def albums(self) -> List['NavidromeAlbum']:
"""Get all albums for this artist"""
return self._client.get_albums_for_artist(self.ratingKey)
class NavidromeAlbum:
"""Wrapper class to mimic Plex album object interface"""
def __init__(self, navidrome_data: Dict[str, Any], client: 'NavidromeClient'):
self._data = navidrome_data
self._client = client
self.ratingKey = navidrome_data.get('id', '')
self.title = navidrome_data.get('name', 'Unknown Album')
self.year = navidrome_data.get('year')
self.addedAt = self._parse_date(navidrome_data.get('created'))
self._artist_id = navidrome_data.get('artistId', '')
self.thumb = self._get_album_image_url()
def _parse_date(self, date_str: Optional[str]) -> Optional[datetime]:
if not date_str:
return None
try:
return datetime.fromisoformat(date_str.replace('Z', '+00:00'))
except:
return None
def _get_album_image_url(self) -> Optional[str]:
"""Generate a Subsonic getCoverArt URL for this album.
Navidrome exposes the stable artwork key as ``coverArt``. Falling
back to the album ID keeps compatibility with older responses while
ensuring library refreshes do not mark albums as artless.
"""
cover_id = self._data.get('coverArt') or self.ratingKey
if not cover_id:
return None
return f"/rest/getCoverArt?id={cover_id}"
def artist(self) -> Optional[NavidromeArtist]:
"""Get the album artist"""
if self._artist_id:
return self._client.get_artist_by_id(self._artist_id)
return None
def tracks(self) -> List['NavidromeTrack']:
"""Get all tracks for this album"""
return self._client.get_tracks_for_album(self.ratingKey)
class NavidromeTrack:
"""Wrapper class to mimic Plex track object interface"""
def __init__(self, navidrome_data: Dict[str, Any], client: 'NavidromeClient'):
self._data = navidrome_data
self._client = client
self.ratingKey = navidrome_data.get('id', '')
self.title = navidrome_data.get('title', 'Unknown Track')
self.duration = navidrome_data.get('duration', 0) * 1000 # Convert to milliseconds
self.trackNumber = navidrome_data.get('track')
self.year = navidrome_data.get('year')
self.userRating = navidrome_data.get('userRating')
self.addedAt = self._parse_date(navidrome_data.get('created'))
# Subsonic API file/quality fields
self.suffix = navidrome_data.get('suffix') # e.g. "flac", "mp3"
self.bitRate = navidrome_data.get('bitRate') # e.g. 320
self.path = navidrome_data.get('path') # e.g. "/music/Artist/Album/track.flac"
# File size in bytes (Subsonic <song size="..."/>) — powers the
# Library Disk Usage card on Stats. None when the server didn't
# report a size (rare but possible for streaming-only nodes).
_nv_size = navidrome_data.get('size')
try:
self.file_size = int(_nv_size) if _nv_size else None
except (TypeError, ValueError):
self.file_size = None
self._album_id = navidrome_data.get('albumId', '')
self._artist_id = navidrome_data.get('artistId', '')
self.musicBrainzId = navidrome_data.get('musicBrainzId')
def _parse_date(self, date_str: Optional[str]) -> Optional[datetime]:
if not date_str:
return None
try:
return datetime.fromisoformat(date_str.replace('Z', '+00:00'))
except:
return None
def artist(self) -> Optional[NavidromeArtist]:
"""Get the track artist"""
if self._artist_id:
return self._client.get_artist_by_id(self._artist_id)
return None
def album(self) -> Optional[NavidromeAlbum]:
"""Get the track's album"""
if self._album_id:
return self._client.get_album_by_id(self._album_id)
return None
from core.media_server.contract import MediaServerClient
class NavidromeClient(MediaServerClient):
def __init__(self):
self.base_url: Optional[str] = None
self.username: Optional[str] = None
self.password: Optional[str] = None
self.music_folder_id: Optional[str] = None
self._connection_attempted = False
self._is_connecting = False
self._last_connect_attempt = 0.0 # monotonic time of the last connect try
# Cache for performance
self._artist_cache = {}
self._album_cache = {}
self._track_cache = {}
self._folder_album_ids: Optional[set] = None # Album IDs in selected music folder
# Progress callback for UI updates
self._progress_callback = None
def set_progress_callback(self, callback):
"""Set callback function for progress updates"""
self._progress_callback = callback
def reload_config(self):
"""Reset connection state so next ensure_connection() re-reads config."""
self.base_url = None
self.username = None
self.password = None
self.music_folder_id = None
self._connection_attempted = False
self._artist_cache.clear()
self._album_cache.clear()
self._track_cache.clear()
self._folder_album_ids = None
logger.info("Navidrome client config reset — will reconnect with new settings")
def get_music_folders(self) -> list:
"""Get available music folders from Navidrome."""
if not self.ensure_connection():
return []
return self._fetch_music_folders()
def _fetch_music_folders(self) -> list:
"""Fetch + parse the Navidrome music folders. Assumes the connection is
already established (base_url/username set) and does NOT call
ensure_connection — so it is safe to call from inside _setup_client's
restore step without re-entering the connection guard (which would
otherwise bail with _is_connecting=True and return an empty list)."""
try:
response = self._make_request('getMusicFolders')
if not response:
return []
folders_data = response.get('musicFolders', {})
folder_list = folders_data.get('musicFolder', [])
if isinstance(folder_list, dict):
folder_list = [folder_list]
return [{'title': f.get('name', 'Unknown'), 'key': str(f.get('id', ''))} for f in folder_list]
except Exception as e:
logger.error(f"Error getting music folders: {e}")
return []
def set_music_folder_by_name(self, folder_name: str) -> bool:
"""Set the active music folder by name."""
try:
folders = self.get_music_folders()
for folder in folders:
if folder['title'] == folder_name:
self.music_folder_id = folder['key']
self._folder_album_ids = None # Invalidate album filter cache
logger.info(f"Set music folder to: {folder_name} (ID: {self.music_folder_id})")
from database.music_database import MusicDatabase
db = MusicDatabase()
# Persist the id (stable across renames) as the primary key,
# keep the name for display + back-compat fallback.
db.set_preference('navidrome_music_folder', folder_name)
db.set_preference('navidrome_music_folder_id', folder['key'])
return True
# If folder_name is empty, clear the selection
if not folder_name:
self.music_folder_id = None
self._folder_album_ids = None # Invalidate album filter cache
from database.music_database import MusicDatabase
db = MusicDatabase()
db.set_preference('navidrome_music_folder', '')
db.set_preference('navidrome_music_folder_id', '')
logger.info("Cleared music folder selection — will use all libraries")
return True
logger.warning(f"Music folder '{folder_name}' not found")
return False
except Exception as e:
logger.error(f"Error setting music folder: {e}")
return False
# A failed connect used to latch the client "disconnected" until the user hit
# the manual Test button (a transient ping failure nukes the creds in
# _setup_client). Re-attempt at most this often so it self-heals on its own.
_RECONNECT_THROTTLE_S = 20.0
def ensure_connection(self) -> bool:
"""Ensure connection to Navidrome with lazy init + self-healing retry.
Already connected → return True. A prior FAILED attempt no longer latches
forever: once _RECONNECT_THROTTLE_S has elapsed it re-attempts, so a
transient ping failure (network blip, Navidrome busy mid-scan) recovers by
itself instead of needing the manual "Test" reconnect."""
if self.base_url is not None and self.username is not None:
return True
if self._is_connecting:
return False
# Disconnected but attempted recently → don't hammer; let it heal on the
# next check past the throttle window.
if self._connection_attempted and \
(time.monotonic() - self._last_connect_attempt) < self._RECONNECT_THROTTLE_S:
return False
self._is_connecting = True
self._last_connect_attempt = time.monotonic()
try:
self._setup_client()
return self.base_url is not None and self.username is not None
finally:
self._is_connecting = False
self._connection_attempted = True
def _setup_client(self):
"""Setup Navidrome client configuration"""
config = config_manager.get_navidrome_config()
if not config.get('base_url'):
logger.warning("Navidrome server URL not configured")
return
if not config.get('username') or not config.get('password'):
logger.warning("Navidrome username/password not configured")
return
self.base_url = config['base_url'].rstrip('/')
self.username = config['username']
self.password = config['password']
try:
# Test connection with ping
response = self._make_request('ping')
if response and response.get('status') == 'ok':
server_version = response.get('version', 'Unknown')
logger.info(f"Successfully connected to Navidrome server version: {server_version}")
# Restore saved music folder preference
try:
from database.music_database import MusicDatabase
db = MusicDatabase()
saved_id = db.get_preference('navidrome_music_folder_id')
saved_folder = db.get_preference('navidrome_music_folder')
if saved_id or saved_folder:
# Use the non-reentrant fetch: we're still inside
# ensure_connection() here (_is_connecting=True), so the
# public get_music_folders() would re-enter the guard and
# return [], silently dropping the saved selection.
folders = self._fetch_music_folders()
# Match by id first (stable across renames in Navidrome);
# fall back to name for installs saved before the id was
# persisted.
matched = None
if saved_id:
matched = next((f for f in folders if f['key'] == str(saved_id)), None)
if matched is None and saved_folder:
matched = next((f for f in folders if f['title'] == saved_folder), None)
if matched is not None:
self.music_folder_id = matched['key']
logger.info(f"Restored music folder preference: {matched['title']} (ID: {self.music_folder_id})")
# Self-heal drifted prefs: a pre-id install (no saved
# id) or a folder renamed in Navidrome (stale name).
# id stays the durable key; name is kept fresh so the
# settings dropdown highlights the right option.
if str(saved_id or '') != matched['key'] or (saved_folder or '') != matched['title']:
try:
db.set_preference('navidrome_music_folder_id', matched['key'])
db.set_preference('navidrome_music_folder', matched['title'])
except Exception as heal_err:
logger.debug(f"Could not self-heal music folder prefs: {heal_err}")
except Exception as e:
logger.warning(f"Could not restore music folder preference: {e}")
else:
logger.error(f"Failed to connect to Navidrome server at {self.base_url}/rest/ping — check URL and network connectivity")
self.base_url = None
self.username = None
self.password = None
except Exception as e:
logger.error(f"Failed to connect to Navidrome server at {self.base_url}/rest/ping: {e}")
self.base_url = None
self.username = None
self.password = None
def _generate_auth_params(self) -> Dict[str, str]:
"""Generate authentication parameters for Subsonic API"""
if not self.username or not self.password:
return {}
# Generate random salt (at least 6 characters)
salt = secrets.token_hex(8)
# Calculate token: md5(password + salt)
token = hashlib.md5((self.password + salt).encode()).hexdigest()
return {
'u': self.username,
't': token,
's': salt,
'v': '1.16.1', # API version
'c': 'SoulSync', # Client name
'f': 'json' # Response format
}
# Fixed salt for cover-art URLs ONLY. Subsonic token auth (t=md5(password
# +salt), s=salt) does not require a unique salt per request, and a stable
# one makes the cover URL deterministic — so the image cache and the
# browser cache actually HIT. The rotating salt from _generate_auth_params
# would make every request a unique URL → cache miss every time + a dead,
# never-reused cache row per fetch (#766 review). The password is never
# exposed either way (only its salted md5).
_COVER_ART_SALT = 'soulsync-cover'
def build_cover_art_url(self, cover_id, size=None) -> Optional[str]:
"""Absolute, Subsonic-authenticated getCoverArt URL for ``cover_id``.
Deterministic for a given (server, password, cover_id) so it caches.
The web layer proxies this to the browser (sync editor + modals).
Returns ``None`` when not connected or no id was supplied. #766: the
``/api/navidrome/cover/<id>`` route had no working URL behind it, so
every Navidrome cover came back blank."""
if not self.base_url or not cover_id:
return None
if not self.username or not self.password:
return None
salt = self._COVER_ART_SALT
token = hashlib.md5((self.password + salt).encode()).hexdigest()
params = {
'u': self.username,
't': token,
's': salt,
'v': '1.16.1',
'c': 'SoulSync',
'f': 'json', # harmless for getCoverArt — it returns image binary
'id': str(cover_id),
}
if size:
params['size'] = str(size)
return f"{self.base_url}/rest/getCoverArt?{urlencode(params)}"
def build_stream_url(self, track_id, max_bitrate=0) -> Optional[str]:
"""Absolute, Subsonic-authenticated ``/rest/stream`` URL for a song.
Lets SoulSync play a Navidrome library track by proxying the server's
own stream API — so playback works WITHOUT mounting the music into the
SoulSync container (#809: SoulSync otherwise reads library files off
disk, which fails when the user hasn't mirror-mounted the library).
``max_bitrate`` 0 = no transcode (original file). Returns None when not
connected / no id."""
if not self.base_url or not track_id:
return None
if not self.username or not self.password:
return None
salt = secrets.token_hex(8)
token = hashlib.md5((self.password + salt).encode()).hexdigest()
params = {
'u': self.username, 't': token, 's': salt,
'v': '1.16.1', 'c': 'SoulSync',
'id': str(track_id),
}
if max_bitrate and int(max_bitrate) > 0:
params['maxBitRate'] = str(int(max_bitrate))
return f"{self.base_url}/rest/stream?{urlencode(params)}"
# Subsonic endpoints that modify data — use POST to avoid URL length limits
_WRITE_ENDPOINTS = frozenset({
'createPlaylist', 'updatePlaylist', 'deletePlaylist',
'star', 'unstar', 'scrobble', 'setRating',
'createShare', 'updateShare', 'deleteShare',
'createUser', 'updateUser', 'deleteUser',
'createBookmark', 'deleteBookmark',
'startScan',
})
def _make_request(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Optional[Dict[str, Any]]:
"""Make authenticated request to Navidrome Subsonic API.
Uses POST for write operations (avoids URL length limits with large playlists)."""
if not self.base_url or not self.username:
return None
url = f"{self.base_url}/rest/{endpoint}"
# Add authentication parameters
auth_params = self._generate_auth_params()
if params:
auth_params.update(params)
try:
# Use POST for write operations to avoid URL length limits
# (e.g., createPlaylist with 161 songId params would exceed GET URL limits)
if endpoint in self._WRITE_ENDPOINTS:
response = requests.post(url, data=auth_params, timeout=30)
else:
response = requests.get(url, params=auth_params, timeout=60)
response.raise_for_status()
data = response.json()
# Check for Subsonic API errors
subsonic_response = data.get('subsonic-response', {})
if subsonic_response.get('status') == 'failed':
error = subsonic_response.get('error', {})
error_message = error.get('message', 'Unknown error')
logger.error(f"Navidrome API error: {error_message}")
return None
return subsonic_response
except requests.exceptions.RequestException as e:
logger.error(f"Navidrome API request failed for {url}: {e}")
return None
except json.JSONDecodeError as e:
logger.error(f"Failed to parse Navidrome response: {e}")
return None
def is_connected(self) -> bool:
"""Connected = configured + last connect OK. When NOT connected, trigger a
(throttled) reconnect attempt so the status self-heals instead of staying
latched disconnected until a manual Test."""
if not (self.base_url and self.username and self.password) and not self._is_connecting:
self.ensure_connection()
return (self.base_url is not None and
self.username is not None and
self.password is not None)
def get_all_artists(self) -> List[NavidromeArtist]:
"""Get all artists from the music library"""
if not self.ensure_connection():
logger.error("Not connected to Navidrome server")
return []
try:
if self._progress_callback:
self._progress_callback("Fetching artists from Navidrome...")
params = {}
if self.music_folder_id:
params['musicFolderId'] = self.music_folder_id
response = self._make_request('getArtists', params if params else None)
if not response:
return []
if self._progress_callback:
self._progress_callback("Processing artist data...")
artists = []
indexes = response.get('artists', {}).get('index', [])
total_indexes = len(indexes)
for i, index in enumerate(indexes):
if self._progress_callback and total_indexes > 1:
progress_pct = int((i / total_indexes) * 100)
self._progress_callback(f"Processing artist index {i+1}/{total_indexes} ({progress_pct}%)")
for artist_data in index.get('artist', []):
artist = NavidromeArtist(artist_data, self)
# Cache the artist for quick lookup
self._artist_cache[artist.ratingKey] = artist
artists.append(artist)
if self._progress_callback:
self._progress_callback(f"Retrieved {len(artists)} artists from Navidrome")
logger.info(f"Retrieved {len(artists)} artists from Navidrome")
return artists
except Exception as e:
logger.error(f"Error getting artists from Navidrome: {e}")
return []
def get_all_artist_ids(self) -> set:
"""Get all artist IDs from Navidrome (lightweight, for removal detection)."""
if not self.ensure_connection():
return set()
try:
params = {}
if self.music_folder_id:
params['musicFolderId'] = self.music_folder_id
response = self._make_request('getArtists', params if params else None)
if not response:
return set()
ids = set()
for index in response.get('artists', {}).get('index', []):
for artist_data in index.get('artist', []):
aid = artist_data.get('id')
if aid:
ids.add(str(aid))
logger.info(f"Retrieved {len(ids)} artist IDs from Navidrome (lightweight)")
return ids
except Exception as e:
logger.error(f"Error getting artist IDs from Navidrome: {e}")
return set()
def get_all_album_ids(self) -> set:
"""Get all album IDs from Navidrome (lightweight, paginated, for removal detection)."""
if not self.ensure_connection():
return set()
try:
all_ids = set()
page_size = 500
offset = 0
while True:
params = {
'type': 'alphabeticalByName',
'size': page_size,
'offset': offset
}
if self.music_folder_id:
params['musicFolderId'] = self.music_folder_id
response = self._make_request('getAlbumList2', params)
if not response:
break
album_list = response.get('albumList2', {}).get('album', [])
if not album_list:
break
for album_data in album_list:
aid = album_data.get('id')
if aid:
all_ids.add(str(aid))
if len(album_list) < page_size:
break
offset += page_size
logger.info(f"Retrieved {len(all_ids)} album IDs from Navidrome (lightweight)")
return all_ids
except Exception as e:
logger.error(f"Error getting album IDs from Navidrome: {e}")
return set()
def get_albums_for_artist(self, artist_id: str) -> List[NavidromeAlbum]:
"""Get all albums for a specific artist"""
# Check cache first
if artist_id in self._album_cache:
return self._album_cache[artist_id]
if not self.ensure_connection():
return []
try:
# Get artist name for progress display
artist_name = "Unknown Artist"
if hasattr(self, '_artist_cache'):
for cached_artist in self._artist_cache.values():
if getattr(cached_artist, 'ratingKey', None) == artist_id:
artist_name = getattr(cached_artist, 'title', 'Unknown Artist')
break
if self._progress_callback:
self._progress_callback(f"Fetching albums for artist {artist_name}...")
params = {'id': artist_id}
if self.music_folder_id:
params['musicFolderId'] = self.music_folder_id
response = self._make_request('getArtist', params)
if not response:
return []
albums = []
artist_data = response.get('artist', {})
album_list = artist_data.get('album', [])
if self._progress_callback and album_list:
self._progress_callback(f"Processing {len(album_list)} albums...")
for album_data in album_list:
albums.append(NavidromeAlbum(album_data, self))
# Filter to selected music folder if Navidrome didn't handle musicFolderId
# on getArtist (not all Subsonic implementations support it on this endpoint)
if self.music_folder_id and albums:
folder_ids = self._get_folder_album_ids()
if folder_ids is not None:
before = len(albums)
albums = [a for a in albums if a.ratingKey in folder_ids]
if len(albums) < before:
logger.debug(f"Filtered {before - len(albums)} albums outside selected music folder for artist {artist_id}")
# Cache the result
self._album_cache[artist_id] = albums
return albums
except Exception as e:
logger.error(f"Error getting albums for artist {artist_id}: {e}")
return []
def _get_folder_album_ids(self) -> Optional[set]:
"""Get set of album IDs belonging to the selected music folder.
Uses getAlbumList2 with musicFolderId to build the set, cached for reuse."""
if self._folder_album_ids is not None:
return self._folder_album_ids
if not self.music_folder_id:
return None
try:
album_ids = set()
offset = 0
page_size = 500
while True:
params = {
'type': 'alphabeticalByArtist',
'size': page_size,
'offset': offset,
'musicFolderId': self.music_folder_id
}
response = self._make_request('getAlbumList2', params)
if not response:
break
album_list = response.get('albumList2', {}).get('album', [])
if not album_list:
break
for a in album_list:
aid = a.get('id', '')
if aid:
album_ids.add(aid)
if len(album_list) < page_size:
break
offset += page_size
self._folder_album_ids = album_ids
logger.info(f"Built music folder album index: {len(album_ids)} albums in selected folder")
return album_ids
except Exception as e:
logger.warning(f"Could not build music folder album index: {e}")
return None
def get_recently_added_albums(self, limit: int = 400) -> List[NavidromeAlbum]:
"""Get recently added albums from Navidrome using getAlbumList2 sorted by newest"""
if not self.ensure_connection():
return []
try:
albums = []
page_size = min(limit, 500)
offset = 0
while len(albums) < limit:
params = {
'type': 'newest',
'size': page_size,
'offset': offset
}
if self.music_folder_id:
params['musicFolderId'] = self.music_folder_id
response = self._make_request('getAlbumList2', params)
if not response:
break
album_list = response.get('albumList2', {}).get('album', [])
if not album_list:
break
for album_data in album_list:
albums.append(NavidromeAlbum(album_data, self))
if len(album_list) < page_size:
break # No more pages
offset += page_size
logger.info(f"Retrieved {len(albums)} recently added albums from Navidrome")
return albums[:limit]
except Exception as e:
logger.error(f"Error getting recently added albums from Navidrome: {e}")
return []
def get_tracks_for_album(self, album_id: str) -> List[NavidromeTrack]:
"""Get all tracks for a specific album"""
# Check cache first
if album_id in self._track_cache:
return self._track_cache[album_id]
if not self.ensure_connection():
return []
try:
# Get album name for progress display
album_name = "Unknown Album"
if hasattr(self, '_album_cache'):
for artist_albums in self._album_cache.values():
for cached_album in artist_albums:
if getattr(cached_album, 'ratingKey', None) == album_id:
album_name = getattr(cached_album, 'title', 'Unknown Album')
break
if album_name != "Unknown Album":
break
if self._progress_callback:
self._progress_callback(f"Fetching tracks for album {album_name}...")
response = self._make_request('getAlbum', {'id': album_id})
if not response:
return []
tracks = []
album_data = response.get('album', {})
track_list = album_data.get('song', [])
if self._progress_callback and track_list:
self._progress_callback(f"Processing {len(track_list)} tracks...")
for track_data in track_list:
tracks.append(NavidromeTrack(track_data, self))
# Cache the result
self._track_cache[album_id] = tracks
return tracks
except Exception as e:
logger.error(f"Error getting tracks for album {album_id}: {e}")
return []
def get_artist_by_id(self, artist_id: str) -> Optional[NavidromeArtist]:
"""Get a specific artist by ID"""
# Check cache first
if artist_id in self._artist_cache:
return self._artist_cache[artist_id]
if not self.ensure_connection():
return None
try:
response = self._make_request('getArtist', {'id': artist_id})
if response and 'artist' in response:
artist = NavidromeArtist(response['artist'], self)
# Cache for future use
self._artist_cache[artist_id] = artist
return artist
return None
except Exception as e:
logger.error(f"Error getting artist {artist_id}: {e}")
return None
def get_album_by_id(self, album_id: str) -> Optional[NavidromeAlbum]:
"""Get a specific album by ID"""
if not self.ensure_connection():
return None
try:
response = self._make_request('getAlbum', {'id': album_id})
if response and 'album' in response:
return NavidromeAlbum(response['album'], self)
return None
except Exception as e:
logger.error(f"Error getting album {album_id}: {e}")
return None
def get_library_stats(self) -> Dict[str, int]:
"""Get library statistics"""
if not self.ensure_connection():
return {}
try:
# Get counts by making API calls
stats = {}
# Get artist count
artists = self.get_all_artists()
stats['artists'] = len(artists)
# For albums and tracks, we'd need to iterate through all artists
# This is expensive, so let's use reasonable estimates or make separate calls
# For now, return what we can efficiently get
stats['albums'] = 0
stats['tracks'] = 0
# TODO: Implement more efficient counting if Navidrome provides bulk stats
return stats
except Exception as e:
logger.error(f"Error getting library stats: {e}")
return {}
def get_play_history(self, limit=500):
"""Fetch recently played tracks via Subsonic API.
Uses getAlbumList2 with type=recent to get recently played albums,
then fetches tracks for each with their play dates.
Returns list of dicts with: track_title, artist, album, played_at,
duration_ms, track_id.
"""
if not self.ensure_connection():
return []
try:
response = self._make_request('getAlbumList2', {
'type': 'recent',
'size': min(limit, 500),
})
if not response:
return []
album_list = response.get('albumList2', {}).get('album', [])
if not isinstance(album_list, list):
album_list = [album_list] if album_list else []
results = []
for album in album_list[:50]: # Limit album fetches to avoid API spam
album_id = album.get('id')
if not album_id:
continue
album_resp = self._make_request('getAlbum', {'id': album_id})
if not album_resp:
continue
songs = album_resp.get('album', {}).get('song', [])
if not isinstance(songs, list):
songs = [songs] if songs else []
for song in songs:
played = song.get('played')
if not played:
continue
results.append({
'track_title': song.get('title', ''),
'artist': song.get('artist', ''),
'album': song.get('album', ''),
'played_at': played,
'duration_ms': int(song.get('duration', 0)) * 1000,
'track_id': song.get('id', ''),
})
logger.info(f"Retrieved {len(results)} play history entries from Navidrome")
return results
except Exception as e:
logger.error(f"Error getting Navidrome play history: {e}")
return []
def get_track_play_counts(self):
"""Get play counts for tracks via Subsonic API.
Uses getAlbumList2 type=frequent to find most-played albums,
then reads playCount from each track.
Returns dict of {track_id: play_count}.
"""
if not self.ensure_connection():
return {}
try:
response = self._make_request('getAlbumList2', {
'type': 'frequent',
'size': 500,
})
if not response:
return {}
album_list = response.get('albumList2', {}).get('album', [])
if not isinstance(album_list, list):
album_list = [album_list] if album_list else []
counts = {}
for album in album_list[:100]:
album_id = album.get('id')
if not album_id:
continue
album_resp = self._make_request('getAlbum', {'id': album_id})
if not album_resp:
continue
songs = album_resp.get('album', {}).get('song', [])
if not isinstance(songs, list):
songs = [songs] if songs else []
for song in songs:
pc = song.get('playCount', 0)
if pc and pc > 0:
counts[song.get('id', '')] = pc
logger.info(f"Retrieved play counts for {len(counts)} tracks from Navidrome")
return counts
except Exception as e:
logger.error(f"Error getting Navidrome track play counts: {e}")
return {}
return {}
def get_all_playlists(self) -> List[PlaylistInfo]:
"""Get all playlists from Navidrome server"""
if not self.ensure_connection():
return []
try:
response = self._make_request('getPlaylists')
if not response:
return []
playlists = []
playlists_data = response.get('playlists', {}).get('playlist', [])
for playlist_data in playlists_data:
playlist_info = PlaylistInfo(
id=playlist_data.get('id', ''),
title=playlist_data.get('name', 'Unknown Playlist'),
description=playlist_data.get('comment'),
duration=playlist_data.get('duration', 0) * 1000, # Convert to milliseconds
leaf_count=playlist_data.get('songCount', 0),
tracks=[] # Will be populated when needed
)
playlists.append(playlist_info)
logger.info(f"Retrieved {len(playlists)} playlists from Navidrome")
return playlists
except Exception as e:
logger.error(f"Error getting playlists from Navidrome: {e}")
return []
def get_playlist_by_name(self, name: str) -> Optional[PlaylistInfo]:
"""Get a specific playlist by name"""
playlists = self.get_all_playlists()
for playlist in playlists:
if playlist.title.lower() == name.lower():
return playlist
return None
def create_playlist(self, name: str, tracks, playlist_id: str = None) -> bool:
"""Create a new playlist or update existing one if playlist_id provided"""
if not self.ensure_connection():
return False
try:
# Convert tracks to Navidrome track IDs
track_ids = []
for track in tracks:
if hasattr(track, 'ratingKey'):
track_ids.append(str(track.ratingKey))
elif hasattr(track, 'id'):
track_ids.append(str(track.id))
if not track_ids:
logger.warning(f"No valid tracks provided for playlist '{name}'")
return False
logger.info(f"{'Updating' if playlist_id else 'Creating'} Navidrome playlist '{name}' with {len(track_ids)} tracks")
# Create/Update playlist params
params = {
'name': name,
'songId': track_ids # Subsonic API accepts multiple songId parameters
}
# If playlist_id is provided, it acts as an overwrite/update
if playlist_id:
params['playlistId'] = playlist_id
response = self._make_request('createPlaylist', params)
if response and response.get('status') == 'ok':
logger.info(f"{'Updated' if playlist_id else 'Created'} Navidrome playlist '{name}' with {len(track_ids)} tracks")
return True
else:
logger.error(f"Failed to {'update' if playlist_id else 'create'} Navidrome playlist '{name}'")
return False
except Exception as e:
logger.error(f"Error {'updating' if playlist_id else 'creating'} Navidrome playlist '{name}': {e}")
return False
def copy_playlist(self, source_name: str, target_name: str) -> bool:
"""Copy a playlist to create a backup"""
if not self.ensure_connection():
return False
try:
# Get the source playlist
source_playlist = self.get_playlist_by_name(source_name)
if not source_playlist:
logger.error(f"Source playlist '{source_name}' not found")
return False
# Get tracks from source playlist
source_tracks = self.get_playlist_tracks(source_playlist.id)
logger.debug(f"Retrieved {len(source_tracks) if source_tracks else 0} tracks from source playlist")
# Validate tracks
if not source_tracks:
logger.warning(f"Source playlist '{source_name}' has no tracks to copy")
return False
# Delete target playlist if it exists (for overwriting backup)
try:
target_playlist = self.get_playlist_by_name(target_name)
if target_playlist:
self._make_request('deletePlaylist', {'id': target_playlist.id})
logger.info(f"Deleted existing backup playlist '{target_name}'")
except Exception as e:
logger.debug("backup playlist precheck: %s", e)
# Create new playlist with copied tracks
try:
success = self.create_playlist(target_name, source_tracks)
if success:
logger.info(f"Created backup playlist '{target_name}' with {len(source_tracks)} tracks")
return True
else:
logger.error(f"Failed to create backup playlist '{target_name}'")
return False
except Exception as create_error:
logger.error(f"Failed to create backup playlist: {create_error}")
return False
except Exception as e:
logger.error(f"Error copying playlist '{source_name}' to '{target_name}': {e}")
return False
def get_playlist_tracks(self, playlist_id: str) -> List[NavidromeTrack]:
"""Get all tracks from a specific playlist"""
if not self.ensure_connection():
return []
try:
response = self._make_request('getPlaylist', {'id': playlist_id})
if not response:
return []
tracks = []
playlist_data = response.get('playlist', {})
for track_data in playlist_data.get('entry', []):
tracks.append(NavidromeTrack(track_data, self))
logger.debug(f"Retrieved {len(tracks)} tracks from playlist {playlist_id}")
return tracks
except Exception as e:
logger.error(f"Error getting tracks for playlist {playlist_id}: {e}")
return []
def get_playlists_by_name(self, name: str) -> List[PlaylistInfo]:
"""Get all playlists matching a specific name (case-insensitive)"""
matches = []
playlists = self.get_all_playlists()
for playlist in playlists:
if playlist.title.lower() == name.lower():
matches.append(playlist)
return matches
def append_to_playlist(self, playlist_name: str, tracks) -> bool:
"""Append tracks to an existing playlist (creates it if missing).
Differs from `update_playlist`: never deletes existing tracks,
never recreates the playlist, no backup. Used by sync mode
'append' so user-added tracks on the server playlist survive
re-syncing the source. Dedupe-by-id ensures we don't re-add
tracks the playlist already contains."""
if not self.ensure_connection():
return False
try:
existing_playlists = self.get_playlists_by_name(playlist_name)
if not existing_playlists:
logger.info(
f"Navidrome append: playlist '{playlist_name}' doesn't exist yet — "
f"creating with {len(tracks)} tracks"
)
return self.create_playlist(playlist_name, tracks)
primary = existing_playlists[0]
# #823 round 2: the old dedupe read `t.id` — but NavidromeTrack only
# defines `ratingKey`, so the existing-ids set was ALWAYS empty and
# every sync re-appended the whole matched list (every track N
# times). Same bug as the Jellyfin append; dedupe on ratingKey.
existing_ids = {
str(getattr(t, 'ratingKey', '') or '')
for t in self.get_playlist_tracks(primary.id)
} - {''}
desired_ids = []
for t in tracks:
tid = None
if hasattr(t, 'ratingKey'):
tid = str(t.ratingKey)
elif hasattr(t, 'id'):
tid = str(t.id) if t.id else None
elif isinstance(t, dict):
tid = str(t.get('id') or '')
if tid:
desired_ids.append(tid)
from core.sync.playlist_edit import plan_playlist_append
new_track_ids = plan_playlist_append(existing_ids, desired_ids)
if not new_track_ids:
logger.info(
f"Navidrome append: no new tracks to add to '{playlist_name}' "
f"(all matched tracks already present)"
)
return True
# Subsonic updatePlaylist: `songIdToAdd` accepts repeated values
# (requests serializes list values as repeated query/form params).
params = {
'playlistId': primary.id,
'songIdToAdd': new_track_ids,
}
response = self._make_request('updatePlaylist', params)
if response and response.get('status') == 'ok':
logger.info(
f"Navidrome append: added {len(new_track_ids)} new tracks to "
f"'{playlist_name}' (skipped {len(tracks) - len(new_track_ids)} "
f"already present)"
)
return True
logger.error(
f"Failed to append to Navidrome playlist '{playlist_name}'"
)
return False
except Exception as e:
logger.error(f"Error appending to Navidrome playlist '{playlist_name}': {e}")
return False
def reconcile_playlist(self, playlist_name: str, tracks) -> bool:
"""In-place reconcile (#792): add missing + remove gone via Subsonic
updatePlaylist (songIdToAdd / songIndexToRemove), keeping the existing
playlist object so its comment/identity survive — no delete/recreate.
Creates the playlist if missing. Returns False so the caller can fall
back to replace on any failure."""
if not self.ensure_connection():
return False
try:
from core.sync.playlist_edit import plan_playlist_reconcile
existing_playlists = self.get_playlists_by_name(playlist_name)
if not existing_playlists:
logger.info(f"Navidrome reconcile: '{playlist_name}' doesn't exist — creating")
return self.create_playlist(playlist_name, tracks)
primary = existing_playlists[0]
existing_tracks = self.get_playlist_tracks(primary.id)
current_ids = [str(t.id) for t in existing_tracks if getattr(t, 'id', None)]
desired_ids = []
for t in tracks:
tid = (str(t.ratingKey) if hasattr(t, 'ratingKey')
else str(t.id) if getattr(t, 'id', None)
else str(t.get('id', '')) if isinstance(t, dict) else '')
if tid:
desired_ids.append(tid)
plan = plan_playlist_reconcile(current_ids, desired_ids)
if not plan['add'] and not plan['remove']:
return True
params = {'playlistId': primary.id}
if plan['add']:
params['songIdToAdd'] = plan['add']
if plan['remove']:
# Indices into the CURRENT list; remove descending so earlier
# removals don't shift the indices of later ones.
remove_set = set(plan['remove'])
params['songIndexToRemove'] = sorted(
(i for i, cid in enumerate(current_ids) if cid in remove_set),
reverse=True,
)
response = self._make_request('updatePlaylist', params)
if response and response.get('status') == 'ok':
logger.info(
f"Navidrome reconcile '{playlist_name}': +{len(plan['add'])} / "
f"-{len(plan['remove'])} (playlist preserved)"
)
return True
logger.error(f"Navidrome reconcile failed for '{playlist_name}'")
return False
except Exception as e:
logger.error(f"Error reconciling Navidrome playlist '{playlist_name}': {e}")
return False
def update_playlist(self, playlist_name: str, tracks) -> bool:
"""Update an existing playlist or create it if it doesn't exist. Handles duplicates."""
if not self.ensure_connection():
return False
try:
# Find ALL existing playlists with this name to handle duplicates
existing_playlists = self.get_playlists_by_name(playlist_name)
# Check if backup is enabled in config
from config.settings import config_manager
create_backup = config_manager.get('playlist_sync.create_backup', True)
# If we have existing playlists and want to backup, use the first one found
if existing_playlists and create_backup:
backup_name = f"{playlist_name} Backup"
logger.info(f"Creating backup playlist '{backup_name}' before sync")
# We only need to backup once, even if duplicates exist
if self.copy_playlist(playlist_name, backup_name):
logger.info("Backup created successfully")
else:
logger.warning("Failed to create backup, continuing with sync")
# STRATEGY: Update the first match, delete the rest
if existing_playlists:
primary_playlist = existing_playlists[0]
duplicates = existing_playlists[1:]
if duplicates:
logger.info(f"Found {len(duplicates)} duplicate playlists for '{playlist_name}'. Cleaning them up...")
for dup in duplicates:
try:
self._make_request('deletePlaylist', {'id': dup.id})
logger.info(f"Deleted duplicate playlist '{playlist_name}' (ID: {dup.id})")
except Exception as del_err:
logger.error(f"Error deleting duplicate playlist '{playlist_name}': {del_err}")
# Update the primary playlist using overwrite (passing playlistId)
logger.info(f"Updating existing playlist '{playlist_name}' (ID: {primary_playlist.id})")
return self.create_playlist(playlist_name, tracks, playlist_id=primary_playlist.id)
else:
# No existing playlist, create new
logger.info(f"Creating new playlist '{playlist_name}'")
return self.create_playlist(playlist_name, tracks)
except Exception as e:
logger.error(f"Error updating Navidrome playlist '{playlist_name}': {e}")
return False
def trigger_library_scan(self, library_name: str = "Music") -> bool:
"""Trigger Navidrome library scan via Subsonic startScan endpoint."""
try:
result = self._make_request('startScan')
if result is not None:
logger.info("Navidrome library scan triggered")
return True
logger.warning("Navidrome startScan returned no response")
return False
except Exception as e:
logger.error(f"Failed to trigger Navidrome scan: {e}")
return False
def is_library_scanning(self, library_name: str = "Music") -> bool:
"""Check if Navidrome library is currently scanning via Subsonic getScanStatus."""
try:
result = self._make_request('getScanStatus')
if result is not None:
scan_status = result.get('scanStatus', {})
return scan_status.get('scanning', False)
return False
except Exception:
return False
# Metadata update methods for compatibility with metadata updater
def update_artist_genres(self, artist, genres: List[str]):
"""Update artist genres - not implemented for Navidrome"""
try:
logger.debug(f"Genre update not implemented for Navidrome artist: {artist.title}")
return True
except Exception as e:
logger.error(f"Error updating genres for {artist.title}: {e}")
return False
def update_artist_poster(self, artist, image_data: bytes):
"""Update artist poster image - not implemented for Navidrome"""
try:
logger.debug(f"Poster update not implemented for Navidrome artist: {artist.title}")
return True
except Exception as e:
logger.error(f"Error updating poster for {artist.title}: {e}")
return False
def update_album_poster(self, album, image_data: bytes):
"""Update album poster image - not implemented for Navidrome"""
try:
logger.debug(f"Poster update not implemented for Navidrome album: {album.title}")
return True
except Exception as e:
logger.error(f"Error updating poster for album {album.title}: {e}")
return False
def update_artist_biography(self, artist) -> bool:
"""Update artist biography - not implemented for Navidrome"""
try:
logger.debug(f"Biography update not implemented for Navidrome artist: {artist.title}")
return True
except Exception as e:
logger.error(f"Error updating biography for {artist.title}: {e}")
return False
def needs_update_by_age(self, artist, refresh_interval_days: int) -> bool:
"""Check if artist needs updating based on age threshold - simplified for Navidrome"""
try:
# For now, just return True for all artists since we don't have timestamp tracking yet
return True
except Exception as e:
logger.debug(f"Error checking update age for {artist.title}: {e}")
return True
def is_artist_ignored(self, artist) -> bool:
"""Check if artist is manually marked to be ignored - simplified for Navidrome"""
try:
# For now, no artists are ignored
return False
except Exception as e:
logger.debug(f"Error checking ignore status for {artist.title}: {e}")
return False
def parse_update_timestamp(self, artist) -> Optional[datetime]:
"""Parse the last update timestamp from artist summary - not implemented for Navidrome"""
try:
return None
except Exception as e:
logger.debug(f"Error parsing timestamp for {artist.title}: {e}")
return None
def get_cache_stats(self):
"""Get cache statistics for debugging/logging"""
return {
'artists_cached': len(self._artist_cache),
'albums_cached': len(self._album_cache),
'tracks_cached': len(self._track_cache),
'bulk_albums_cached': len(self._album_cache), # For compatibility with Jellyfin interface
'bulk_tracks_cached': len(self._track_cache) # For compatibility with Jellyfin interface
}
def clear_cache(self):
"""Clear all caches to force fresh data on next request"""
self._artist_cache.clear()
self._album_cache.clear()
self._track_cache.clear()
logger.info("Navidrome client cache cleared")
def search_tracks(self, title: str, artist: str, limit: int = 15) -> List[TrackInfo]:
"""Search for tracks using Navidrome search API"""
if not self.ensure_connection():
logger.warning("Navidrome not connected. Cannot perform search.")
return []
try:
# Use Subsonic search3 API for music search
query = f"{artist} {title}".strip()
params = {
'query': query,
'songCount': limit,
'artistCount': 0,
'albumCount': 0
}
if self.music_folder_id:
params['musicFolderId'] = self.music_folder_id
response = self._make_request('search3', params)
if not response:
return []
tracks = []
search_result = response.get('searchResult3', {})
for track_data in search_result.get('song', []):
track_info = TrackInfo(
id=track_data.get('id', ''),
title=track_data.get('title', ''),
artist=track_data.get('artist', ''),
album=track_data.get('album', ''),
duration=track_data.get('duration', 0) * 1000, # Convert to milliseconds
track_number=track_data.get('track'),
year=track_data.get('year'),
rating=track_data.get('userRating')
)
# Store reference to original track for playlist creation
track_info._original_navidrome_track = NavidromeTrack(track_data, self)
tracks.append(track_info)
logger.info(f"Found {len(tracks)} tracks for '{title}' by '{artist}'")
return tracks
except Exception as e:
logger.error(f"Error searching for tracks: {e}")
return []