From 8a4672e2eb347a3c2db5d6626b86ff4dfbc212e9 Mon Sep 17 00:00:00 2001 From: Broque Thomas <26755000+Nezreka@users.noreply.github.com> Date: Thu, 12 Mar 2026 09:25:25 -0700 Subject: [PATCH] =?UTF-8?q?Encrypt=20sensitive=20config=20values=20at=20re?= =?UTF-8?q?st=20with=20Fernet=20=E2=80=94=20transparent=20migration,=20zer?= =?UTF-8?q?o=20breaking=20changes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 3 + api/settings.py | 14 +++- config/settings.py | 197 ++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 201 insertions(+), 13 deletions(-) diff --git a/.gitignore b/.gitignore index 92f6f279..7a5ffa29 100644 --- a/.gitignore +++ b/.gitignore @@ -7,5 +7,8 @@ __pycache__/ *.pyc *.pyo +# Encryption key (generated per-instance, lives next to database) +.encryption_key + # Auto-downloaded binaries bin/ diff --git a/api/settings.py b/api/settings.py index 94d779bf..6680c5f0 100644 --- a/api/settings.py +++ b/api/settings.py @@ -8,14 +8,22 @@ from .helpers import api_success, api_error # Keys that must NEVER be exposed via the API _SENSITIVE_KEYS = { + "spotify.client_id", "spotify.client_secret", - "soulseek.api_key", + "tidal.client_id", + "tidal.client_secret", + "tidal_tokens", + "tidal_download.session", + "qobuz.session", "plex.token", "jellyfin.api_key", "navidrome.password", - "hydrabase.api_key", - "tidal_download.session", + "soulseek.api_key", "listenbrainz.token", + "acoustid.api_key", + "lastfm.api_key", + "genius.access_token", + "hydrabase.api_key", } diff --git a/config/settings.py b/config/settings.py index 0886bbad..339baf4c 100644 --- a/config/settings.py +++ b/config/settings.py @@ -1,8 +1,9 @@ +import copy import json import os import sqlite3 from typing import Dict, Any, Optional -from cryptography.fernet import Fernet +from cryptography.fernet import Fernet, InvalidToken from pathlib import Path class ConfigManager: @@ -35,7 +36,7 @@ class ConfigManager: print(f"🔧 ConfigManager initialized with path: {self.config_path}") self.config_data: Dict[str, Any] = {} - self.encryption_key: Optional[bytes] = None + self._fernet: Optional[Fernet] = None # Use DATABASE_PATH env var, fallback to database/music_library.db db_path_env = os.environ.get('DATABASE_PATH') @@ -58,17 +59,186 @@ class ConfigManager: self._load_config() - def _get_encryption_key(self) -> bytes: - key_file = self.config_path.parent / ".encryption_key" + # Dot-notation paths to sensitive config values that must be encrypted at rest. + # Paths pointing to dicts encrypt the entire dict as a JSON blob. + _SENSITIVE_PATHS = frozenset({ + # Spotify + 'spotify.client_id', + 'spotify.client_secret', + # Tidal + 'tidal.client_id', + 'tidal.client_secret', + 'tidal_tokens', # full dict (access/refresh tokens) + 'tidal_download.session', # full dict (access/refresh/expiry) + # Qobuz + 'qobuz.session', # full dict (app_id, app_secret, user_auth_token) + # Media servers + 'plex.token', + 'jellyfin.api_key', + 'navidrome.password', + # Download sources + 'soulseek.api_key', + # Enrichment services + 'listenbrainz.token', + 'acoustid.api_key', + 'lastfm.api_key', + 'genius.access_token', + # Other + 'hydrabase.api_key', + }) + + def _get_fernet(self) -> Fernet: + """Return a cached Fernet instance, creating the key file if needed.""" + if self._fernet is not None: + return self._fernet + key_file = self.database_path.parent / ".encryption_key" + # Migrate key from old location (config/) to new location (database/) + old_key_file = self.config_path.parent / ".encryption_key" + if not key_file.exists() and old_key_file.exists(): + try: + import shutil + shutil.move(str(old_key_file), str(key_file)) + print(f"[MIGRATE] 🔑 Moved encryption key to {key_file}") + except Exception: + key_file = old_key_file # Fall back to old location if key_file.exists(): with open(key_file, 'rb') as f: - return f.read() + key = f.read() else: key = Fernet.generate_key() + key_file.parent.mkdir(parents=True, exist_ok=True) with open(key_file, 'wb') as f: f.write(key) - key_file.chmod(0o600) - return key + try: + key_file.chmod(0o600) + except OSError: + pass # Windows may not support Unix permissions + self._fernet = Fernet(key) + return self._fernet + + def _encrypt_value(self, value) -> str: + """Encrypt a config value (string or dict/list) into a Fernet token string.""" + f = self._get_fernet() + if isinstance(value, (dict, list)): + plaintext = json.dumps(value) + else: + plaintext = str(value) + return f.encrypt(plaintext.encode('utf-8')).decode('ascii') + + def _decrypt_value(self, value): + """Decrypt a Fernet token string back to the original value. + If value is not encrypted (migration), returns it unchanged.""" + if not isinstance(value, str): + return value + # Fernet tokens always start with 'gAAAAA' + if not value.startswith('gAAAAA'): + return value + try: + f = self._get_fernet() + decrypted = f.decrypt(value.encode('ascii')).decode('utf-8') + # Only parse JSON for dicts/lists (starts with { or [). + # Plain strings (including numeric ones like API keys) stay as strings. + if decrypted and decrypted[0] in ('{', '['): + try: + return json.loads(decrypted) + except (json.JSONDecodeError, ValueError): + pass + return decrypted + except InvalidToken: + # Key mismatch — encrypted with a different key (key file deleted/replaced) + print(f"[ERROR] ⚠️ Failed to decrypt a config value — encryption key may have changed. " + f"Re-enter credentials in Settings or restore the original .encryption_key file.") + return value + except Exception: + return value + + def _encrypt_sensitive(self, config_data: Dict[str, Any]) -> Dict[str, Any]: + """Return a deep copy of config_data with sensitive values encrypted.""" + encrypted = copy.deepcopy(config_data) + for path in self._SENSITIVE_PATHS: + keys = path.split('.') + # Navigate to the parent + parent = encrypted + for k in keys[:-1]: + if isinstance(parent, dict) and k in parent: + parent = parent[k] + else: + parent = None + break + if parent is None or not isinstance(parent, dict): + continue + leaf = keys[-1] + if leaf not in parent: + continue + value = parent[leaf] + # Skip empty values (no point encrypting empty strings/dicts) + if not value and value != 0: + continue + # Skip already-encrypted values (idempotent) + if isinstance(value, str) and value.startswith('gAAAAA'): + continue + parent[leaf] = self._encrypt_value(value) + return encrypted + + def _decrypt_sensitive(self, config_data: Dict[str, Any]) -> Dict[str, Any]: + """Decrypt sensitive values in-place and return the config dict.""" + for path in self._SENSITIVE_PATHS: + keys = path.split('.') + parent = config_data + for k in keys[:-1]: + if isinstance(parent, dict) and k in parent: + parent = parent[k] + else: + parent = None + break + if parent is None or not isinstance(parent, dict): + continue + leaf = keys[-1] + if leaf not in parent: + continue + parent[leaf] = self._decrypt_value(parent[leaf]) + return config_data + + def _migrate_encrypt_if_needed(self): + """Re-save config to encrypt any plaintext sensitive values still in the DB.""" + try: + # Read raw DB content to check if any sensitive value is still plaintext + conn = sqlite3.connect(str(self.database_path)) + cursor = conn.cursor() + cursor.execute("SELECT value FROM metadata WHERE key = 'app_config'") + row = cursor.fetchone() + conn.close() + if not row or not row[0]: + return + raw = json.loads(row[0]) + needs_migration = False + for path in self._SENSITIVE_PATHS: + keys = path.split('.') + parent = raw + for k in keys[:-1]: + if isinstance(parent, dict) and k in parent: + parent = parent[k] + else: + parent = None + break + if parent is None or not isinstance(parent, dict): + continue + leaf = keys[-1] + if leaf not in parent: + continue + value = parent[leaf] + if not value and value != 0: + continue + # If the value is NOT a Fernet token, it's still plaintext + if not (isinstance(value, str) and value.startswith('gAAAAA')): + needs_migration = True + break + if needs_migration: + print("[MIGRATE] 🔐 Encrypting sensitive config values at rest...") + self._save_to_database(self.config_data) + print("[OK] ✅ Sensitive config values encrypted successfully") + except Exception as e: + print(f"[WARN] Could not migrate encryption: {e}") def _ensure_database_exists(self): """Ensure database file and metadata table exist""" @@ -94,7 +264,7 @@ class ConfigManager: print(f"Warning: Could not ensure database exists: {e}") def _load_from_database(self) -> Optional[Dict[str, Any]]: - """Load configuration from database""" + """Load configuration from database, decrypting sensitive values.""" try: self._ensure_database_exists() @@ -106,6 +276,8 @@ class ConfigManager: if row and row[0]: config_data = json.loads(row[0]) + # Decrypt sensitive values (gracefully handles plaintext migration) + config_data = self._decrypt_sensitive(config_data) print("[OK] Configuration loaded from database") return config_data else: @@ -116,14 +288,17 @@ class ConfigManager: return None def _save_to_database(self, config_data: Dict[str, Any]) -> bool: - """Save configuration to database""" + """Save configuration to database, encrypting sensitive values.""" try: self._ensure_database_exists() + # Encrypt sensitive values before writing (original dict is untouched) + encrypted_data = self._encrypt_sensitive(config_data) + conn = sqlite3.connect(str(self.database_path)) cursor = conn.cursor() - config_json = json.dumps(config_data, indent=2) + config_json = json.dumps(encrypted_data, indent=2) cursor.execute(""" INSERT OR REPLACE INTO metadata (key, value, updated_at) VALUES ('app_config', ?, CURRENT_TIMESTAMP) @@ -284,6 +459,8 @@ class ConfigManager: if config_data: # Configuration exists in database self.config_data = config_data + # Ensure sensitive values are encrypted at rest (one-time migration) + self._migrate_encrypt_if_needed() return # Database is empty - try migration from config.json