import copy import json import os import sqlite3 import time from typing import Dict, Any, Optional from cryptography.fernet import Fernet, InvalidToken from pathlib import Path from utils.logging_config import get_logger logger = get_logger("config") class ConfigManager: _VALID_LOG_LEVELS = frozenset({"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"}) def __init__(self, config_path: str = "config/config.json"): # Determine strict absolute path to settings.py directory to help resolve config.json # This handles cases where CWD is different (e.g. running from /Users vs /Users/project) self.base_dir = Path(__file__).parent.parent.absolute() # Check for environment variable override first (Unified logic with web_server.py) env_config_path = os.environ.get('SOULSYNC_CONFIG_PATH') if env_config_path: config_path = env_config_path # Resolve config path if os.path.isabs(config_path): self.config_path = Path(config_path) else: # Try to resolve relative to CWD first (legacy behavior), then relative to project root cwd_path = Path(config_path) project_path = self.base_dir / config_path if cwd_path.exists(): self.config_path = cwd_path.absolute() elif project_path.exists(): self.config_path = project_path else: # Default to project path even if it doesn't exist yet (for creation/fallback) self.config_path = project_path logger.info(f"ConfigManager initialized with path: {self.config_path}") self.config_data: Dict[str, Any] = {} self._fernet: Optional[Fernet] = None # Use DATABASE_PATH env var, fallback to database/music_library.db db_path_env = os.environ.get('DATABASE_PATH') if db_path_env: self.database_path = Path(db_path_env) else: self.database_path = self.base_dir / "database" / "music_library.db" logger.info(f"Database path set to: {self.database_path}") self.load_config(str(self.config_path)) def load_config(self, config_path: str = None): """ Load configuration from database or file. Can be called to reload settings into the existing instance. """ if config_path: self.config_path = Path(config_path) self._load_config() # Dot-notation paths to sensitive config values that must be encrypted at rest. # Paths pointing to dicts encrypt the entire dict as a JSON blob. _SENSITIVE_PATHS = frozenset({ # Spotify 'spotify.client_id', 'spotify.client_secret', # Tidal 'tidal.client_id', 'tidal.client_secret', 'tidal_tokens', # full dict (access/refresh tokens) 'tidal_download.session', # full dict (access/refresh/expiry) # Qobuz 'qobuz.session', # full dict (app_id, app_secret, user_auth_token) # Media servers 'plex.token', 'jellyfin.api_key', 'navidrome.password', # Download sources 'soulseek.api_key', 'deezer_download.arl', 'lidarr_download.api_key', 'prowlarr.api_key', 'torrent_client.password', 'usenet_client.api_key', 'usenet_client.password', # Enrichment services 'listenbrainz.token', 'acoustid.api_key', 'lastfm.api_key', 'lastfm.api_secret', 'lastfm.session_key', 'genius.access_token', # Deezer OAuth 'deezer.app_id', 'deezer.app_secret', 'deezer.access_token', # Other 'hydrabase.api_key', 'discogs.token', }) def _get_fernet(self) -> Fernet: """Return a cached Fernet instance, creating the key file if needed.""" if self._fernet is not None: return self._fernet key_file = self.database_path.parent / ".encryption_key" # Migrate key from old location (config/) to new location (database/) old_key_file = self.config_path.parent / ".encryption_key" if not key_file.exists() and old_key_file.exists(): try: import shutil shutil.move(str(old_key_file), str(key_file)) logger.info(f"Moved encryption key to {key_file}") except Exception: key_file = old_key_file # Fall back to old location if key_file.exists(): with open(key_file, 'rb') as f: key = f.read() else: key = Fernet.generate_key() key_file.parent.mkdir(parents=True, exist_ok=True) with open(key_file, 'wb') as f: f.write(key) try: key_file.chmod(0o600) except OSError: pass # Windows may not support Unix permissions self._fernet = Fernet(key) return self._fernet def _encrypt_value(self, value) -> str: """Encrypt a config value (string or dict/list) into a Fernet token string.""" f = self._get_fernet() if isinstance(value, (dict, list)): plaintext = json.dumps(value) else: plaintext = str(value) return f.encrypt(plaintext.encode('utf-8')).decode('ascii') def _decrypt_value(self, value): """Decrypt a Fernet token string back to the original value. If value is not encrypted (migration), returns it unchanged.""" if not isinstance(value, str): return value # Fernet tokens always start with 'gAAAAA' if not value.startswith('gAAAAA'): return value try: f = self._get_fernet() decrypted = f.decrypt(value.encode('ascii')).decode('utf-8') # Only parse JSON for dicts/lists (starts with { or [). # Plain strings (including numeric ones like API keys) stay as strings. if decrypted and decrypted[0] in ('{', '['): try: return json.loads(decrypted) except (json.JSONDecodeError, ValueError): pass return decrypted except InvalidToken: # Key mismatch — encrypted with a different key (key file deleted/replaced) logger.error( "Failed to decrypt a config value — encryption key may have changed. " "Re-enter credentials in Settings or restore the original .encryption_key file." ) return value except Exception: return value def _encrypt_sensitive(self, config_data: Dict[str, Any]) -> Dict[str, Any]: """Return a deep copy of config_data with sensitive values encrypted.""" encrypted = copy.deepcopy(config_data) for path in self._SENSITIVE_PATHS: keys = path.split('.') # Navigate to the parent parent = encrypted for k in keys[:-1]: if isinstance(parent, dict) and k in parent: parent = parent[k] else: parent = None break if parent is None or not isinstance(parent, dict): continue leaf = keys[-1] if leaf not in parent: continue value = parent[leaf] # Skip empty values (no point encrypting empty strings/dicts) if not value and value != 0: continue # Skip already-encrypted values (idempotent) if isinstance(value, str) and value.startswith('gAAAAA'): continue parent[leaf] = self._encrypt_value(value) return encrypted def _decrypt_sensitive(self, config_data: Dict[str, Any]) -> Dict[str, Any]: """Decrypt sensitive values in-place and return the config dict.""" for path in self._SENSITIVE_PATHS: keys = path.split('.') parent = config_data for k in keys[:-1]: if isinstance(parent, dict) and k in parent: parent = parent[k] else: parent = None break if parent is None or not isinstance(parent, dict): continue leaf = keys[-1] if leaf not in parent: continue parent[leaf] = self._decrypt_value(parent[leaf]) return config_data def _migrate_encrypt_if_needed(self): """Re-save config to encrypt any plaintext sensitive values still in the DB.""" try: # Read raw DB content to check if any sensitive value is still plaintext conn = self._connect_db() cursor = conn.cursor() cursor.execute("SELECT value FROM metadata WHERE key = 'app_config'") row = cursor.fetchone() conn.close() if not row or not row[0]: return raw = json.loads(row[0]) needs_migration = False for path in self._SENSITIVE_PATHS: keys = path.split('.') parent = raw for k in keys[:-1]: if isinstance(parent, dict) and k in parent: parent = parent[k] else: parent = None break if parent is None or not isinstance(parent, dict): continue leaf = keys[-1] if leaf not in parent: continue value = parent[leaf] if not value and value != 0: continue # If the value is NOT a Fernet token, it's still plaintext if not (isinstance(value, str) and value.startswith('gAAAAA')): needs_migration = True break if needs_migration: logger.info("Encrypting sensitive config values at rest...") self._save_to_database(self.config_data) logger.info("Sensitive config values encrypted successfully") except Exception as e: logger.warning(f"Could not migrate encryption: {e}") def _connect_db(self) -> sqlite3.Connection: """Open a configured SQLite connection for the config DB. Centralizes pragma setup so every connection gets WAL mode, a 30s busy timeout, and synchronous=NORMAL (the safe pairing with WAL that avoids unnecessary fsyncs on slow disks). """ conn = sqlite3.connect(str(self.database_path), timeout=30.0) conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA busy_timeout=30000") conn.execute("PRAGMA synchronous=NORMAL") return conn def _ensure_database_exists(self): """Ensure database file and metadata table exist""" try: # Create database directory if it doesn't exist self.database_path.parent.mkdir(parents=True, exist_ok=True) # Connect to database (creates file if it doesn't exist) conn = self._connect_db() cursor = conn.cursor() # Create metadata table if it doesn't exist cursor.execute(""" CREATE TABLE IF NOT EXISTS metadata ( key TEXT PRIMARY KEY, value TEXT, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) conn.commit() conn.close() except Exception as e: logger.warning(f"Could not ensure database exists: {e}") def _load_from_database(self) -> Optional[Dict[str, Any]]: """Load configuration from database, decrypting sensitive values.""" conn = None try: self._ensure_database_exists() conn = self._connect_db() cursor = conn.cursor() cursor.execute("SELECT value FROM metadata WHERE key = 'app_config'") row = cursor.fetchone() if row and row[0]: config_data = json.loads(row[0]) # Decrypt sensitive values (gracefully handles plaintext migration) config_data = self._decrypt_sensitive(config_data) logger.info("Configuration loaded from database") return config_data else: return None except Exception as e: logger.warning(f"Could not load config from database: {e}") return None finally: if conn: conn.close() def _load_stored_log_level(self) -> Optional[str]: """Load the persisted UI log level preference, if one exists.""" conn = None try: self._ensure_database_exists() conn = self._connect_db() cursor = conn.cursor() cursor.execute("SELECT value FROM metadata WHERE key = 'log_level'") row = cursor.fetchone() if not row or not row[0]: return None level = str(row[0]).upper() if level not in self._VALID_LOG_LEVELS: logger.warning(f"Ignoring invalid stored log level: {row[0]}") return None return level except Exception as e: logger.warning(f"Could not load stored log level from database: {e}") return None finally: if conn: conn.close() def _load_env_log_level(self) -> Optional[str]: """Load the log level override from the environment, if one exists.""" raw_level = os.environ.get("SOULSYNC_LOG_LEVEL") if not raw_level: return None level = raw_level.upper() if level not in self._VALID_LOG_LEVELS: logger.warning(f"Ignoring invalid SOULSYNC_LOG_LEVEL value: {raw_level}") return None return level def _apply_log_level_overrides(self, config_data: Dict[str, Any]) -> Dict[str, Any]: """Overlay env and persisted log level preferences onto the loaded config.""" env_level = self._load_env_log_level() if env_level: config_data.setdefault("logging", {})["level"] = env_level logger.info(f"Using log level from SOULSYNC_LOG_LEVEL: {env_level}") return config_data stored_level = self._load_stored_log_level() if stored_level: config_data.setdefault("logging", {})["level"] = stored_level logger.info(f"Using stored logging level from database: {stored_level}") return config_data def _save_to_database(self, config_data: Dict[str, Any]) -> bool: """Save configuration to database, encrypting sensitive values. Returns ``True`` on success. Transient ``database is locked`` failures are logged at DEBUG so the caller's retry loop owns the user-visible error message — otherwise every retry would spam ERROR-level logs even when the next attempt succeeds. """ conn = None try: self._ensure_database_exists() # Encrypt sensitive values before writing (original dict is untouched) encrypted_data = self._encrypt_sensitive(config_data) conn = self._connect_db() cursor = conn.cursor() config_json = json.dumps(encrypted_data, indent=2) cursor.execute(""" INSERT OR REPLACE INTO metadata (key, value, updated_at) VALUES ('app_config', ?, CURRENT_TIMESTAMP) """, (config_json,)) conn.commit() return True except sqlite3.OperationalError as e: # SQLite raises OperationalError("database is locked") when the # busy_timeout expires while another writer holds the lock. # Log at DEBUG so the caller can decide whether the final # outcome warrants an ERROR-level message. if "locked" in str(e).lower(): logger.debug(f"Config DB locked, will retry: {e}") else: logger.error(f"Could not save config to database: {e}") return False except Exception as e: logger.error(f"Could not save config to database: {e}") return False finally: if conn: conn.close() def _load_from_config_file(self) -> Optional[Dict[str, Any]]: """Load configuration from config.json file (for migration)""" try: if self.config_path.exists(): with open(self.config_path, 'r') as f: config_data = json.load(f) logger.info(f"Configuration loaded from {self.config_path}") return config_data else: return None except Exception as e: logger.warning(f"Could not load config from file: {e}") return None def _get_default_config(self) -> Dict[str, Any]: """Get default configuration""" return { "active_media_server": "plex", "spotify": { "client_id": "", "client_secret": "", "redirect_uri": "http://127.0.0.1:8888/callback" }, "tidal": { "client_id": "", "client_secret": "", "redirect_uri": "http://127.0.0.1:8889/tidal/callback" }, "plex": { "base_url": "", "token": "", "auto_detect": True }, "jellyfin": { "base_url": "", "api_key": "", "auto_detect": True }, "navidrome": { "base_url": "", "username": "", "password": "", "auto_detect": True }, "soulseek": { "slskd_url": "", "api_key": "", "download_path": "./downloads", "transfer_path": "./Transfer", "max_peer_queue": 0, "download_timeout": 600, # Reddit report (YeloMelo95, Bell Canada): the existing # 35-per-220s sliding-window cap allows all 35 searches in # rapid succession before throttling — that burst trips ISP # anti-abuse. This knob forces a min gap between consecutive # searches even when the window cap isn't hit. 0 = disabled # (preserves prior behavior). "search_min_delay_seconds": 0, }, "download_source": { "mode": "soulseek", # Options: "soulseek", "youtube", "tidal", "qobuz", "hifi", "hybrid", "torrent", "usenet" "hybrid_primary": "soulseek", # Legacy: primary source for hybrid mode "hybrid_secondary": "youtube", # Legacy: fallback source for hybrid mode "hybrid_order": [], # Ordered list of sources for hybrid mode (overrides primary/secondary) "stream_source": "youtube", # Options: "youtube" (instant, default), "active" (use download source; falls back to youtube if soulseek) # Album-bundle (torrent / usenet single-source) poll tuning. # Downloader is polled every N seconds until the release # lands; whole job aborts at the timeout. Defaults match # the previous hard-coded constants. Users on slow private # trackers / large box sets can extend the timeout without # editing source. "album_bundle_poll_interval_seconds": 2.0, "album_bundle_timeout_seconds": 6 * 60 * 60, # 6 hours }, "tidal_download": { "quality": "lossless", # Options: "low", "high", "lossless", "hires" "session": { "token_type": "", "access_token": "", "refresh_token": "", "expiry_time": 0 } }, "qobuz": { "quality": "lossless", # Options: "mp3", "lossless", "hires", "hires_max" "session": { "app_id": "", "app_secret": "", "user_auth_token": "" } }, "hifi_download": { "quality": "lossless", # Options: "low", "high", "lossless", "hires" }, "hifi": { "embed_tags": True, "tags": { "track_id": True, "artist_id": True, "isrc": True, "bpm": True, "copyright": True, } }, "lidarr_download": { "url": "", "api_key": "", "root_folder": "", "quality_profile": "Any", "cleanup_after_import": True, }, # Prowlarr — indexer aggregator. Feeds the torrent / usenet # download plugins. Not a standalone source. "prowlarr": { "url": "", "api_key": "", # Comma-separated list of indexer IDs to limit searches to. # Empty = search all enabled indexers. "indexer_ids": "", }, # Torrent client — receives .torrent / magnet URIs from the # torrent download plugin. ``type`` picks which adapter to # instantiate (qbittorrent | transmission | deluge). "torrent_client": { "type": "qbittorrent", "url": "", "username": "", "password": "", "category": "soulsync", "save_path": "", }, # Usenet client — receives .nzb URLs / payloads. ``type`` # picks the adapter (sabnzbd | nzbget). SABnzbd uses an # API key; NZBGet uses username + password. "usenet_client": { "type": "sabnzbd", "url": "", "api_key": "", "username": "", "password": "", "category": "soulsync", }, "soundcloud_download": { # Anonymous-only for now — SoundCloud Go+ OAuth tier could be # added later, with credentials living under a "session" subkey # alongside Tidal/Qobuz. No quality knob: anonymous SoundCloud # caps at the upload's transcoding (typically 128 kbps MP3 or # AAC). yt-dlp resolves bestaudio at download time. }, "listenbrainz": { "base_url": "", "token": "", "scrobble_enabled": False }, "acoustid": { "api_key": "", "enabled": False # Disabled by default - requires API key and fpcalc }, "lastfm": { "api_key": "", "api_secret": "", "session_key": "", "scrobble_enabled": False }, "genius": { "access_token": "" }, "logging": { "path": "logs/app.log", "level": "INFO" }, "database": { "path": os.environ.get('DATABASE_PATH', 'database/music_library.db'), "max_workers": 5 }, "image_cache": { "enabled": True, "path": "storage/image_cache", "ttl_seconds": 2592000, "failed_ttl_seconds": 21600, "max_download_mb": 15 }, "metadata_enhancement": { "enabled": True, "embed_album_art": True, "post_process_order": ["musicbrainz", "deezer", "audiodb", "tidal", "qobuz", "lastfm", "genius"] }, "musicbrainz": { "embed_tags": True }, "playlist_sync": { "create_backup": True }, "settings": { "audio_quality": "flac" }, "lossy_copy": { "enabled": False, "codec": "mp3", "bitrate": "320", "delete_original": False, "downsample_hires": False }, "listening_stats": { "enabled": True, "poll_interval": 30 }, "library": { "music_paths": [], "music_videos_path": "" }, "scripts": { "path": "./scripts", "timeout": 60 }, "import": { "staging_path": "./Staging", "replace_lower_quality": False }, "m3u_export": { "enabled": False, "entry_base_path": "" }, "youtube": { "cookies_browser": "", # "", "chrome", "firefox", "edge", "brave", "opera", "safari" "download_delay": 3, # seconds between sequential downloads }, "hydrabase": { "url": "", "api_key": "", "auto_connect": False, "enabled": False }, "content_filter": { "allow_explicit": True } } def _load_config(self): """ Load configuration with priority: 1. Database (primary storage) 2. config.json (migration from file-based config) 3. Defaults (fresh install) """ logger.info("Loading configuration...") # Try loading from database first config_data = self._load_from_database() if config_data: # Configuration exists in database self.config_data = self._apply_log_level_overrides(config_data) # Ensure sensitive values are encrypted at rest (one-time migration) self._migrate_encrypt_if_needed() return # Database is empty - try migration from config.json logger.info(f"Configuration not found in database. Attempting migration from: {self.config_path}") config_data = self._load_from_config_file() if config_data: # Migrate from config.json to database logger.info("Migrating configuration from config.json to database...") if self._save_to_database(config_data): logger.info("Configuration migrated successfully to database.") self.config_data = self._apply_log_level_overrides(config_data) return else: logger.warning("Migration failed - using file-based config temporarily.") self.config_data = self._apply_log_level_overrides(config_data) return # No config.json either - use defaults logger.info("No existing configuration found (DB or File) - using defaults") config_data = self._get_default_config() # Try to save defaults to database if self._save_to_database(config_data): logger.info("Default configuration saved to database") else: logger.warning("Could not save defaults to database - using in-memory config") self.config_data = self._apply_log_level_overrides(config_data) def _save_config(self): """Save configuration to database with exponential-backoff retry on lock. Spread retries over ~7 seconds so a long-held writer (enrichment worker batch insert, library scan commit, etc.) on a slow disk has time to release the lock before we fall back to the JSON file. The single 1s retry that used to live here gave up too early on HDD-backed Docker volumes. """ # Cumulative delay across attempts: 0.2 + 0.5 + 1.0 + 2.0 + 4.0 = 7.7s # plus the 30s busy_timeout that already runs inside each attempt. retry_delays = [0.2, 0.5, 1.0, 2.0, 4.0] if self._save_to_database(self.config_data): return for delay in retry_delays: time.sleep(delay) if self._save_to_database(self.config_data): return # All retries exhausted — fall back to config.json so the user # doesn't lose their settings, then log a single error. logger.error( f"Config DB save failed after {len(retry_delays) + 1} attempts (database is locked) — " "falling back to config.json" ) try: self.config_path.parent.mkdir(parents=True, exist_ok=True) with open(self.config_path, 'w') as f: json.dump(self.config_data, f, indent=2) logger.warning("Configuration saved to config.json as fallback") except Exception as e: logger.error(f"Failed to save configuration: {e}") def get(self, key: str, default: Any = None) -> Any: keys = key.split('.') value = self.config_data for k in keys: if isinstance(value, dict) and k in value: value = value[k] else: return default return value def set(self, key: str, value: Any): keys = key.split('.') config = self.config_data for k in keys[:-1]: if k not in config: config[k] = {} config = config[k] config[keys[-1]] = value self._save_config() def get_spotify_config(self) -> Dict[str, str]: return self.get('spotify', {}) def get_plex_config(self) -> Dict[str, str]: return self.get('plex', {}) def get_jellyfin_config(self) -> Dict[str, str]: return self.get('jellyfin', {}) def get_navidrome_config(self) -> Dict[str, str]: return self.get('navidrome', {}) def get_soulseek_config(self) -> Dict[str, str]: return self.get('soulseek', {}) def get_hydrabase_config(self) -> Dict[str, str]: return self.get('hydrabase', {}) def get_settings(self) -> Dict[str, Any]: return self.get('settings', {}) def get_database_config(self) -> Dict[str, str]: return self.get('database', {}) def get_logging_config(self) -> Dict[str, str]: return self.get('logging', {}) def get_active_media_server(self) -> str: return self.get('active_media_server', 'plex') def set_active_media_server(self, server: str): """Set the active media server (plex, jellyfin, navidrome, or soulsync)""" if server not in ['plex', 'jellyfin', 'navidrome', 'soulsync']: raise ValueError(f"Invalid media server: {server}") self.set('active_media_server', server) def get_active_media_server_config(self) -> Dict[str, str]: """Get configuration for the currently active media server""" active_server = self.get_active_media_server() if active_server == 'plex': return self.get_plex_config() elif active_server == 'jellyfin': return self.get_jellyfin_config() elif active_server == 'navidrome': return self.get_navidrome_config() elif active_server == 'soulsync': return {'transfer_path': self.get('soulseek.transfer_path', './Transfer')} else: return {} def is_configured(self) -> bool: spotify = self.get_spotify_config() active_server = self.get_active_media_server() soulseek = self.get_soulseek_config() # Check active media server configuration media_server_configured = False if active_server == 'plex': plex = self.get_plex_config() media_server_configured = bool(plex.get('base_url')) and bool(plex.get('token')) elif active_server == 'jellyfin': jellyfin = self.get_jellyfin_config() media_server_configured = bool(jellyfin.get('base_url')) and bool(jellyfin.get('api_key')) elif active_server == 'navidrome': navidrome = self.get_navidrome_config() media_server_configured = bool(navidrome.get('base_url')) and bool(navidrome.get('username')) and bool(navidrome.get('password')) elif active_server == 'soulsync': media_server_configured = True # SoulSync standalone is always configured return ( bool(spotify.get('client_id')) and bool(spotify.get('client_secret')) and media_server_configured and bool(soulseek.get('slskd_url')) ) def validate_config(self) -> Dict[str, bool]: active_server = self.get_active_media_server() validation = { 'spotify': bool(self.get('spotify.client_id')) and bool(self.get('spotify.client_secret')), 'soulseek': bool(self.get('soulseek.slskd_url')) } # Validate all server types but mark active one validation['plex'] = bool(self.get('plex.base_url')) and bool(self.get('plex.token')) validation['jellyfin'] = bool(self.get('jellyfin.base_url')) and bool(self.get('jellyfin.api_key')) validation['navidrome'] = bool(self.get('navidrome.base_url')) and bool(self.get('navidrome.username')) and bool(self.get('navidrome.password')) validation['soulsync'] = True # Standalone mode is always valid validation['active_media_server'] = active_server return validation config_manager = ConfigManager()