|
|
|
|
@ -1,8 +1,9 @@
|
|
|
|
|
import copy
|
|
|
|
|
import json
|
|
|
|
|
import os
|
|
|
|
|
import sqlite3
|
|
|
|
|
from typing import Dict, Any, Optional
|
|
|
|
|
from cryptography.fernet import Fernet
|
|
|
|
|
from cryptography.fernet import Fernet, InvalidToken
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
|
|
|
|
|
class ConfigManager:
|
|
|
|
|
@ -35,7 +36,7 @@ class ConfigManager:
|
|
|
|
|
print(f"🔧 ConfigManager initialized with path: {self.config_path}")
|
|
|
|
|
|
|
|
|
|
self.config_data: Dict[str, Any] = {}
|
|
|
|
|
self.encryption_key: Optional[bytes] = None
|
|
|
|
|
self._fernet: Optional[Fernet] = None
|
|
|
|
|
|
|
|
|
|
# Use DATABASE_PATH env var, fallback to database/music_library.db
|
|
|
|
|
db_path_env = os.environ.get('DATABASE_PATH')
|
|
|
|
|
@ -58,17 +59,186 @@ class ConfigManager:
|
|
|
|
|
|
|
|
|
|
self._load_config()
|
|
|
|
|
|
|
|
|
|
def _get_encryption_key(self) -> bytes:
|
|
|
|
|
key_file = self.config_path.parent / ".encryption_key"
|
|
|
|
|
# Dot-notation paths to sensitive config values that must be encrypted at rest.
|
|
|
|
|
# Paths pointing to dicts encrypt the entire dict as a JSON blob.
|
|
|
|
|
_SENSITIVE_PATHS = frozenset({
|
|
|
|
|
# Spotify
|
|
|
|
|
'spotify.client_id',
|
|
|
|
|
'spotify.client_secret',
|
|
|
|
|
# Tidal
|
|
|
|
|
'tidal.client_id',
|
|
|
|
|
'tidal.client_secret',
|
|
|
|
|
'tidal_tokens', # full dict (access/refresh tokens)
|
|
|
|
|
'tidal_download.session', # full dict (access/refresh/expiry)
|
|
|
|
|
# Qobuz
|
|
|
|
|
'qobuz.session', # full dict (app_id, app_secret, user_auth_token)
|
|
|
|
|
# Media servers
|
|
|
|
|
'plex.token',
|
|
|
|
|
'jellyfin.api_key',
|
|
|
|
|
'navidrome.password',
|
|
|
|
|
# Download sources
|
|
|
|
|
'soulseek.api_key',
|
|
|
|
|
# Enrichment services
|
|
|
|
|
'listenbrainz.token',
|
|
|
|
|
'acoustid.api_key',
|
|
|
|
|
'lastfm.api_key',
|
|
|
|
|
'genius.access_token',
|
|
|
|
|
# Other
|
|
|
|
|
'hydrabase.api_key',
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
def _get_fernet(self) -> Fernet:
|
|
|
|
|
"""Return a cached Fernet instance, creating the key file if needed."""
|
|
|
|
|
if self._fernet is not None:
|
|
|
|
|
return self._fernet
|
|
|
|
|
key_file = self.database_path.parent / ".encryption_key"
|
|
|
|
|
# Migrate key from old location (config/) to new location (database/)
|
|
|
|
|
old_key_file = self.config_path.parent / ".encryption_key"
|
|
|
|
|
if not key_file.exists() and old_key_file.exists():
|
|
|
|
|
try:
|
|
|
|
|
import shutil
|
|
|
|
|
shutil.move(str(old_key_file), str(key_file))
|
|
|
|
|
print(f"[MIGRATE] 🔑 Moved encryption key to {key_file}")
|
|
|
|
|
except Exception:
|
|
|
|
|
key_file = old_key_file # Fall back to old location
|
|
|
|
|
if key_file.exists():
|
|
|
|
|
with open(key_file, 'rb') as f:
|
|
|
|
|
return f.read()
|
|
|
|
|
key = f.read()
|
|
|
|
|
else:
|
|
|
|
|
key = Fernet.generate_key()
|
|
|
|
|
key_file.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
with open(key_file, 'wb') as f:
|
|
|
|
|
f.write(key)
|
|
|
|
|
key_file.chmod(0o600)
|
|
|
|
|
return key
|
|
|
|
|
try:
|
|
|
|
|
key_file.chmod(0o600)
|
|
|
|
|
except OSError:
|
|
|
|
|
pass # Windows may not support Unix permissions
|
|
|
|
|
self._fernet = Fernet(key)
|
|
|
|
|
return self._fernet
|
|
|
|
|
|
|
|
|
|
def _encrypt_value(self, value) -> str:
|
|
|
|
|
"""Encrypt a config value (string or dict/list) into a Fernet token string."""
|
|
|
|
|
f = self._get_fernet()
|
|
|
|
|
if isinstance(value, (dict, list)):
|
|
|
|
|
plaintext = json.dumps(value)
|
|
|
|
|
else:
|
|
|
|
|
plaintext = str(value)
|
|
|
|
|
return f.encrypt(plaintext.encode('utf-8')).decode('ascii')
|
|
|
|
|
|
|
|
|
|
def _decrypt_value(self, value):
|
|
|
|
|
"""Decrypt a Fernet token string back to the original value.
|
|
|
|
|
If value is not encrypted (migration), returns it unchanged."""
|
|
|
|
|
if not isinstance(value, str):
|
|
|
|
|
return value
|
|
|
|
|
# Fernet tokens always start with 'gAAAAA'
|
|
|
|
|
if not value.startswith('gAAAAA'):
|
|
|
|
|
return value
|
|
|
|
|
try:
|
|
|
|
|
f = self._get_fernet()
|
|
|
|
|
decrypted = f.decrypt(value.encode('ascii')).decode('utf-8')
|
|
|
|
|
# Only parse JSON for dicts/lists (starts with { or [).
|
|
|
|
|
# Plain strings (including numeric ones like API keys) stay as strings.
|
|
|
|
|
if decrypted and decrypted[0] in ('{', '['):
|
|
|
|
|
try:
|
|
|
|
|
return json.loads(decrypted)
|
|
|
|
|
except (json.JSONDecodeError, ValueError):
|
|
|
|
|
pass
|
|
|
|
|
return decrypted
|
|
|
|
|
except InvalidToken:
|
|
|
|
|
# Key mismatch — encrypted with a different key (key file deleted/replaced)
|
|
|
|
|
print(f"[ERROR] ⚠️ Failed to decrypt a config value — encryption key may have changed. "
|
|
|
|
|
f"Re-enter credentials in Settings or restore the original .encryption_key file.")
|
|
|
|
|
return value
|
|
|
|
|
except Exception:
|
|
|
|
|
return value
|
|
|
|
|
|
|
|
|
|
def _encrypt_sensitive(self, config_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
|
|
|
"""Return a deep copy of config_data with sensitive values encrypted."""
|
|
|
|
|
encrypted = copy.deepcopy(config_data)
|
|
|
|
|
for path in self._SENSITIVE_PATHS:
|
|
|
|
|
keys = path.split('.')
|
|
|
|
|
# Navigate to the parent
|
|
|
|
|
parent = encrypted
|
|
|
|
|
for k in keys[:-1]:
|
|
|
|
|
if isinstance(parent, dict) and k in parent:
|
|
|
|
|
parent = parent[k]
|
|
|
|
|
else:
|
|
|
|
|
parent = None
|
|
|
|
|
break
|
|
|
|
|
if parent is None or not isinstance(parent, dict):
|
|
|
|
|
continue
|
|
|
|
|
leaf = keys[-1]
|
|
|
|
|
if leaf not in parent:
|
|
|
|
|
continue
|
|
|
|
|
value = parent[leaf]
|
|
|
|
|
# Skip empty values (no point encrypting empty strings/dicts)
|
|
|
|
|
if not value and value != 0:
|
|
|
|
|
continue
|
|
|
|
|
# Skip already-encrypted values (idempotent)
|
|
|
|
|
if isinstance(value, str) and value.startswith('gAAAAA'):
|
|
|
|
|
continue
|
|
|
|
|
parent[leaf] = self._encrypt_value(value)
|
|
|
|
|
return encrypted
|
|
|
|
|
|
|
|
|
|
def _decrypt_sensitive(self, config_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
|
|
|
"""Decrypt sensitive values in-place and return the config dict."""
|
|
|
|
|
for path in self._SENSITIVE_PATHS:
|
|
|
|
|
keys = path.split('.')
|
|
|
|
|
parent = config_data
|
|
|
|
|
for k in keys[:-1]:
|
|
|
|
|
if isinstance(parent, dict) and k in parent:
|
|
|
|
|
parent = parent[k]
|
|
|
|
|
else:
|
|
|
|
|
parent = None
|
|
|
|
|
break
|
|
|
|
|
if parent is None or not isinstance(parent, dict):
|
|
|
|
|
continue
|
|
|
|
|
leaf = keys[-1]
|
|
|
|
|
if leaf not in parent:
|
|
|
|
|
continue
|
|
|
|
|
parent[leaf] = self._decrypt_value(parent[leaf])
|
|
|
|
|
return config_data
|
|
|
|
|
|
|
|
|
|
def _migrate_encrypt_if_needed(self):
|
|
|
|
|
"""Re-save config to encrypt any plaintext sensitive values still in the DB."""
|
|
|
|
|
try:
|
|
|
|
|
# Read raw DB content to check if any sensitive value is still plaintext
|
|
|
|
|
conn = sqlite3.connect(str(self.database_path))
|
|
|
|
|
cursor = conn.cursor()
|
|
|
|
|
cursor.execute("SELECT value FROM metadata WHERE key = 'app_config'")
|
|
|
|
|
row = cursor.fetchone()
|
|
|
|
|
conn.close()
|
|
|
|
|
if not row or not row[0]:
|
|
|
|
|
return
|
|
|
|
|
raw = json.loads(row[0])
|
|
|
|
|
needs_migration = False
|
|
|
|
|
for path in self._SENSITIVE_PATHS:
|
|
|
|
|
keys = path.split('.')
|
|
|
|
|
parent = raw
|
|
|
|
|
for k in keys[:-1]:
|
|
|
|
|
if isinstance(parent, dict) and k in parent:
|
|
|
|
|
parent = parent[k]
|
|
|
|
|
else:
|
|
|
|
|
parent = None
|
|
|
|
|
break
|
|
|
|
|
if parent is None or not isinstance(parent, dict):
|
|
|
|
|
continue
|
|
|
|
|
leaf = keys[-1]
|
|
|
|
|
if leaf not in parent:
|
|
|
|
|
continue
|
|
|
|
|
value = parent[leaf]
|
|
|
|
|
if not value and value != 0:
|
|
|
|
|
continue
|
|
|
|
|
# If the value is NOT a Fernet token, it's still plaintext
|
|
|
|
|
if not (isinstance(value, str) and value.startswith('gAAAAA')):
|
|
|
|
|
needs_migration = True
|
|
|
|
|
break
|
|
|
|
|
if needs_migration:
|
|
|
|
|
print("[MIGRATE] 🔐 Encrypting sensitive config values at rest...")
|
|
|
|
|
self._save_to_database(self.config_data)
|
|
|
|
|
print("[OK] ✅ Sensitive config values encrypted successfully")
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"[WARN] Could not migrate encryption: {e}")
|
|
|
|
|
|
|
|
|
|
def _ensure_database_exists(self):
|
|
|
|
|
"""Ensure database file and metadata table exist"""
|
|
|
|
|
@ -94,7 +264,7 @@ class ConfigManager:
|
|
|
|
|
print(f"Warning: Could not ensure database exists: {e}")
|
|
|
|
|
|
|
|
|
|
def _load_from_database(self) -> Optional[Dict[str, Any]]:
|
|
|
|
|
"""Load configuration from database"""
|
|
|
|
|
"""Load configuration from database, decrypting sensitive values."""
|
|
|
|
|
try:
|
|
|
|
|
self._ensure_database_exists()
|
|
|
|
|
|
|
|
|
|
@ -106,6 +276,8 @@ class ConfigManager:
|
|
|
|
|
|
|
|
|
|
if row and row[0]:
|
|
|
|
|
config_data = json.loads(row[0])
|
|
|
|
|
# Decrypt sensitive values (gracefully handles plaintext migration)
|
|
|
|
|
config_data = self._decrypt_sensitive(config_data)
|
|
|
|
|
print("[OK] Configuration loaded from database")
|
|
|
|
|
return config_data
|
|
|
|
|
else:
|
|
|
|
|
@ -116,14 +288,17 @@ class ConfigManager:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def _save_to_database(self, config_data: Dict[str, Any]) -> bool:
|
|
|
|
|
"""Save configuration to database"""
|
|
|
|
|
"""Save configuration to database, encrypting sensitive values."""
|
|
|
|
|
try:
|
|
|
|
|
self._ensure_database_exists()
|
|
|
|
|
|
|
|
|
|
# Encrypt sensitive values before writing (original dict is untouched)
|
|
|
|
|
encrypted_data = self._encrypt_sensitive(config_data)
|
|
|
|
|
|
|
|
|
|
conn = sqlite3.connect(str(self.database_path))
|
|
|
|
|
cursor = conn.cursor()
|
|
|
|
|
|
|
|
|
|
config_json = json.dumps(config_data, indent=2)
|
|
|
|
|
config_json = json.dumps(encrypted_data, indent=2)
|
|
|
|
|
cursor.execute("""
|
|
|
|
|
INSERT OR REPLACE INTO metadata (key, value, updated_at)
|
|
|
|
|
VALUES ('app_config', ?, CURRENT_TIMESTAMP)
|
|
|
|
|
@ -284,6 +459,8 @@ class ConfigManager:
|
|
|
|
|
if config_data:
|
|
|
|
|
# Configuration exists in database
|
|
|
|
|
self.config_data = config_data
|
|
|
|
|
# Ensure sensitive values are encrypted at rest (one-time migration)
|
|
|
|
|
self._migrate_encrypt_if_needed()
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
# Database is empty - try migration from config.json
|
|
|
|
|
|