Profiles Phase 0: service-credential-sets foundation (data + resolver, dormant)

Groundwork for admin-created, per-profile-switchable credential sets ("pills")
across auth services (Spotify/Tidal/Deezer/Qobuz/Plex/Jellyfin/Navidrome).
Strictly additive and dormant — nothing reads it at runtime yet, so zero
behaviour change for existing installs.

- core/credentials/store.py: pure service registry + payload validation +
  stale-safe active-set selection (pick_active_credential falls back to None
  when a selected set was deleted, so a profile never breaks).
- migration service_credentials_v1: two new tables — service_credentials
  (admin-created named sets; payload Fernet-encrypted at rest) and
  profile_service_credentials (each profile's selected set per service).
- MusicDatabase CRUD: create/update/delete/list/get_service_credential
  (list never returns the payload; get decrypts for the resolver), plus
  set/get_profile_service_credential and resolve_profile_service_credential
  (returns the profile's active payload or None → caller uses global default).

Tests: 12 — pure validation + stale-safe selection, and real-temp-DB storage
proving encryption round-trips, payload never lists, dup(service,label)
rejected, per-profile/per-service resolution, and delete clearing dangling
selections to a clean fallback. 95 migration/DB tests still pass.
pull/529/merge
BoulderBadgeDad 3 weeks ago
parent e16216fc2d
commit daee96f814

@ -0,0 +1,79 @@
"""Named, switchable service-credential sets — pure logic (Phase 0 foundation).
Today every auth service (Spotify, Tidal, Deezer, Qobuz, Plex, Jellyfin,
Navidrome) holds ONE credential set in config, and clients are global singletons
built from that single slot. This module is the groundwork for letting an admin
save MULTIPLE named credential sets per service ("pills") that each profile can
switch between, without anyone but the admin creating them.
Kept PURE service registry, payload validation, and active-set selection,
free of DB/Flask so it's unit-testable. Encrypted storage lives in MusicDatabase
(service_credentials / profile_service_credentials tables); runtime client
resolution + UI come in later phases. Nothing here changes existing behaviour;
it's dormant capability until wired.
"""
from __future__ import annotations
# Services that support multiple named credential sets, mapped to the payload
# keys that MUST be present for a set to be usable. Extra keys (OAuth tokens,
# redirect URIs, quality prefs) are allowed and preserved — these are only the
# minimum required to validate a set the admin is saving.
SERVICE_CREDENTIAL_SCHEMA = {
'spotify': ('client_id', 'client_secret'),
'tidal': ('access_token', 'refresh_token'),
'deezer': ('arl',),
'qobuz': ('user_auth_token',),
'plex': ('base_url', 'token'),
'jellyfin': ('base_url', 'api_key'),
'navidrome': ('base_url', 'username', 'password'),
}
SUPPORTED_SERVICES = frozenset(SERVICE_CREDENTIAL_SCHEMA)
def is_supported_service(service: str) -> bool:
"""True when the service supports named credential sets."""
return service in SERVICE_CREDENTIAL_SCHEMA
def validate_credential_payload(service: str, payload):
"""Return ``(ok, missing_keys)`` for a credential set.
Valid when every required key for the service is present and truthy. An
unknown service is invalid with no missing list (caller should reject it
as unsupported, not as "incomplete").
"""
required = SERVICE_CREDENTIAL_SCHEMA.get(service)
if required is None:
return False, []
if not isinstance(payload, dict):
return False, list(required)
missing = [k for k in required if not payload.get(k)]
return (not missing), missing
def pick_active_credential(credentials, selected_id):
"""From ``credentials`` (a list of dicts each carrying ``id``), return the
one whose id == ``selected_id``.
Returns None when there's no selection OR the selected id isn't present
i.e. a stale pointer whose credential set was deleted. The caller then
falls back to the global/admin default, so a deleted set never breaks a
profile. Pure + stale-safe.
"""
if not selected_id:
return None
for cred in credentials or []:
if cred.get('id') == selected_id:
return cred
return None
__all__ = [
'SERVICE_CREDENTIAL_SCHEMA',
'SUPPORTED_SERVICES',
'is_supported_service',
'validate_credential_payload',
'pick_active_credential',
]

@ -461,6 +461,7 @@ class MusicDatabase:
self._add_profile_settings(cursor)
self._add_profile_listenbrainz_support(cursor)
self._add_profile_service_credentials(cursor)
self._add_service_credential_sets(cursor)
self._add_soul_id_columns(cursor)
self._add_listening_history_table(cursor)
@ -3853,6 +3854,228 @@ class MusicDatabase:
except Exception as e:
logger.error(f"Error in per-profile service credentials migration: {e}")
def _add_service_credential_sets(self, cursor):
"""Named, switchable credential sets per auth service + each profile's
selection of which set is active (Phase 0 foundation).
Additive only two new tables, no change to existing tables/columns.
Dormant until the resolver + UI are wired in a later phase, so this
migration changes no runtime behaviour for existing installs.
"""
try:
cursor.execute("SELECT value FROM metadata WHERE key = 'service_credentials_v1' LIMIT 1")
if cursor.fetchone():
return # Already migrated
logger.info("Applying service-credential-sets migration...")
# Admin-created named credential sets. `payload` is a Fernet-encrypted
# JSON blob (same key as per-profile tokens), so secrets stay encrypted
# at rest. UNIQUE(service, label) keeps pill names distinct per service.
cursor.execute("""
CREATE TABLE IF NOT EXISTS service_credentials (
id INTEGER PRIMARY KEY AUTOINCREMENT,
service TEXT NOT NULL,
label TEXT NOT NULL,
payload TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(service, label)
)
""")
# Per-profile selection of which credential set is active for a
# service. A missing row (or NULL credential_id) means "fall back to
# the global/admin default" — so a profile never breaks if its
# chosen set is later removed.
cursor.execute("""
CREATE TABLE IF NOT EXISTS profile_service_credentials (
profile_id INTEGER NOT NULL,
service TEXT NOT NULL,
credential_id INTEGER,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (profile_id, service)
)
""")
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_service_credentials_service "
"ON service_credentials (service)"
)
cursor.execute(
"INSERT OR REPLACE INTO metadata (key, value) VALUES ('service_credentials_v1', '1')"
)
logger.info("Service-credential-sets migration completed")
except Exception as e:
logger.error(f"Error in service-credential-sets migration: {e}")
# ── Service credential sets (named, switchable per profile) ──────────────
def create_service_credential(self, service: str, label: str, payload: dict):
"""Create a named credential set for a service. Returns the new id, or
None on failure / duplicate (service, label). Payload is encrypted."""
try:
from config.settings import config_manager
enc = config_manager._encrypt_value(payload) if payload else None
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"INSERT INTO service_credentials (service, label, payload) VALUES (?, ?, ?)",
(service, label, enc),
)
conn.commit()
return cursor.lastrowid
except sqlite3.IntegrityError:
logger.warning(f"Service credential '{label}' already exists for {service}")
return None
except Exception as e:
logger.error(f"Error creating service credential ({service}/{label}): {e}")
return None
def update_service_credential(self, credential_id: int, label: str = None,
payload: dict = None) -> bool:
"""Update a credential set's label and/or payload. Only provided fields
change. Returns True if a row was updated."""
try:
from config.settings import config_manager
sets, params = [], []
if label is not None:
sets.append("label = ?")
params.append(label)
if payload is not None:
sets.append("payload = ?")
params.append(config_manager._encrypt_value(payload) if payload else None)
if not sets:
return False
sets.append("updated_at = CURRENT_TIMESTAMP")
params.append(credential_id)
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
f"UPDATE service_credentials SET {', '.join(sets)} WHERE id = ?", params
)
conn.commit()
return cursor.rowcount > 0
except sqlite3.IntegrityError:
logger.warning(f"Rename of credential {credential_id} collides with an existing label")
return False
except Exception as e:
logger.error(f"Error updating service credential {credential_id}: {e}")
return False
def delete_service_credential(self, credential_id: int) -> bool:
"""Delete a credential set and clear any profile selections that point
at it (so those profiles fall back to the global default). Returns True
if the set existed."""
try:
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"UPDATE profile_service_credentials SET credential_id = NULL, "
"updated_at = CURRENT_TIMESTAMP WHERE credential_id = ?",
(credential_id,),
)
cursor.execute("DELETE FROM service_credentials WHERE id = ?", (credential_id,))
conn.commit()
return cursor.rowcount > 0
except Exception as e:
logger.error(f"Error deleting service credential {credential_id}: {e}")
return False
def list_service_credentials(self, service: str = None):
"""List credential sets (metadata only — never the payload). Optionally
filtered to one service. Returns dicts: id, service, label, timestamps."""
try:
with self._get_connection() as conn:
cursor = conn.cursor()
if service:
cursor.execute(
"SELECT id, service, label, created_at, updated_at FROM service_credentials "
"WHERE service = ? ORDER BY label COLLATE NOCASE",
(service,),
)
else:
cursor.execute(
"SELECT id, service, label, created_at, updated_at FROM service_credentials "
"ORDER BY service, label COLLATE NOCASE"
)
return [
{'id': r[0], 'service': r[1], 'label': r[2],
'created_at': r[3], 'updated_at': r[4]}
for r in cursor.fetchall()
]
except Exception as e:
logger.error(f"Error listing service credentials: {e}")
return []
def get_service_credential(self, credential_id: int):
"""Get a credential set WITH its decrypted payload, or None. For the
resolver / client wiring not for shipping to the browser."""
try:
from config.settings import config_manager
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT id, service, label, payload FROM service_credentials WHERE id = ?",
(credential_id,),
)
row = cursor.fetchone()
if not row:
return None
payload = config_manager._decrypt_value(row[3]) if row[3] else {}
return {'id': row[0], 'service': row[1], 'label': row[2],
'payload': payload if isinstance(payload, dict) else {}}
except Exception as e:
logger.error(f"Error reading service credential {credential_id}: {e}")
return None
def set_profile_service_credential(self, profile_id: int, service: str,
credential_id) -> bool:
"""Select which credential set is active for a profile + service.
Pass credential_id=None to clear (fall back to global). Upsert."""
try:
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"INSERT INTO profile_service_credentials (profile_id, service, credential_id) "
"VALUES (?, ?, ?) "
"ON CONFLICT(profile_id, service) DO UPDATE SET "
"credential_id = excluded.credential_id, updated_at = CURRENT_TIMESTAMP",
(profile_id, service, credential_id),
)
conn.commit()
return True
except Exception as e:
logger.error(f"Error setting profile {profile_id} {service} credential: {e}")
return False
def get_profile_service_credential_id(self, profile_id: int, service: str):
"""Return the credential_id a profile selected for a service, or None."""
try:
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT credential_id FROM profile_service_credentials "
"WHERE profile_id = ? AND service = ?",
(profile_id, service),
)
row = cursor.fetchone()
return row[0] if row else None
except Exception as e:
logger.error(f"Error reading profile {profile_id} {service} selection: {e}")
return None
def resolve_profile_service_credential(self, profile_id: int, service: str):
"""Resolve a profile's ACTIVE credential payload for a service: the
decrypted payload of its selected set, or None when it hasn't selected
one (or the set was deleted) caller then uses the global/admin default.
Stale-safe: a dangling selection resolves to None, not an error."""
cred_id = self.get_profile_service_credential_id(profile_id, service)
if not cred_id:
return None
cred = self.get_service_credential(cred_id)
return cred['payload'] if cred else None
def _add_soul_id_columns(self, cursor):
"""Add soul_id columns to artists, albums, and tracks tables."""
try:

@ -0,0 +1,140 @@
"""Phase 0: named, switchable service-credential sets.
Foundation for letting an admin save multiple named credential sets per auth
service ("pills") that each profile can switch between. These cover the PURE
selection/validation logic and the encrypted DB storage + per-profile selection
+ resolver with real temp databases (not mocks) so encryption round-trips and
the stale-selection fallback are genuinely exercised.
This layer is dormant (nothing reads it at runtime yet), so it can't regress
existing behaviour the tests pin the contract the later wiring will rely on.
"""
from __future__ import annotations
import pytest
from core.credentials.store import (
SUPPORTED_SERVICES,
is_supported_service,
validate_credential_payload,
pick_active_credential,
)
from database.music_database import MusicDatabase
# ── pure: validation ────────────────────────────────────────────────────────
def test_supported_services_cover_the_auth_sources():
for s in ('spotify', 'tidal', 'deezer', 'qobuz', 'plex', 'jellyfin', 'navidrome'):
assert is_supported_service(s)
assert not is_supported_service('itunes') # no auth → not a credential service
assert not is_supported_service('musicbrainz')
def test_validate_payload_ok_and_missing():
ok, missing = validate_credential_payload('spotify', {'client_id': 'a', 'client_secret': 'b'})
assert ok and missing == []
ok, missing = validate_credential_payload('spotify', {'client_id': 'a'})
assert not ok and missing == ['client_secret']
def test_validate_payload_unknown_service_and_non_dict():
assert validate_credential_payload('nope', {'x': 1}) == (False, [])
ok, missing = validate_credential_payload('plex', None)
assert not ok and set(missing) == {'base_url', 'token'}
def test_validate_treats_empty_string_as_missing():
ok, missing = validate_credential_payload('navidrome',
{'base_url': 'http://x', 'username': '', 'password': 'p'})
assert not ok and missing == ['username']
# ── pure: active-set selection (stale-safe) ──────────────────────────────────
def test_pick_active_credential_match_and_misses():
creds = [{'id': 1, 'label': 'A'}, {'id': 2, 'label': 'B'}]
assert pick_active_credential(creds, 2)['label'] == 'B'
assert pick_active_credential(creds, None) is None # no selection
assert pick_active_credential(creds, 99) is None # stale id (set deleted)
assert pick_active_credential([], 1) is None
assert pick_active_credential(None, 1) is None
# ── DB: storage + selection + resolver (real temp DB, real encryption) ───────
@pytest.fixture
def db(tmp_path):
return MusicDatabase(database_path=str(tmp_path / 'creds.db'))
def test_create_get_roundtrip_encrypts_payload(db, tmp_path):
cid = db.create_service_credential('spotify', "Brock's Spotify",
{'client_id': 'abc', 'client_secret': 'sek'})
assert cid
got = db.get_service_credential(cid)
assert got['service'] == 'spotify' and got['label'] == "Brock's Spotify"
assert got['payload'] == {'client_id': 'abc', 'client_secret': 'sek'}
# The on-disk payload must be ciphertext, never the plaintext secret.
import sqlite3
raw = sqlite3.connect(str(tmp_path / 'creds.db')).execute(
"SELECT payload FROM service_credentials WHERE id = ?", (cid,)).fetchone()[0]
assert 'sek' not in raw and raw.startswith('gAAAAA')
def test_duplicate_label_per_service_rejected(db):
assert db.create_service_credential('spotify', 'Main', {'client_id': 'a', 'client_secret': 'b'})
assert db.create_service_credential('spotify', 'Main', {'client_id': 'c', 'client_secret': 'd'}) is None
# same label is fine under a DIFFERENT service
assert db.create_service_credential('tidal', 'Main', {'access_token': 't', 'refresh_token': 'r'})
def test_list_never_exposes_payload(db):
db.create_service_credential('spotify', 'One', {'client_id': 'a', 'client_secret': 'b'})
db.create_service_credential('deezer', 'Two', {'arl': 'xyz'})
rows = db.list_service_credentials()
assert {r['label'] for r in rows} == {'One', 'Two'}
assert all('payload' not in r for r in rows)
assert [r['label'] for r in db.list_service_credentials('deezer')] == ['Two']
def test_update_label_and_payload(db):
cid = db.create_service_credential('qobuz', 'Q', {'user_auth_token': 'tok'})
assert db.update_service_credential(cid, label='Q renamed')
assert db.update_service_credential(cid, payload={'user_auth_token': 'newtok'})
got = db.get_service_credential(cid)
assert got['label'] == 'Q renamed' and got['payload']['user_auth_token'] == 'newtok'
def test_profile_selection_resolves_and_falls_back(db):
cid = db.create_service_credential('spotify', 'Shared', {'client_id': 'a', 'client_secret': 'b'})
# No selection → None (caller uses global default)
assert db.resolve_profile_service_credential(7, 'spotify') is None
db.set_profile_service_credential(7, 'spotify', cid)
assert db.resolve_profile_service_credential(7, 'spotify') == {'client_id': 'a', 'client_secret': 'b'}
# Clearing the selection falls back again
db.set_profile_service_credential(7, 'spotify', None)
assert db.resolve_profile_service_credential(7, 'spotify') is None
def test_delete_clears_selections_and_resolves_to_fallback(db):
cid = db.create_service_credential('spotify', 'Temp', {'client_id': 'a', 'client_secret': 'b'})
db.set_profile_service_credential(2, 'spotify', cid)
db.set_profile_service_credential(9, 'spotify', cid)
assert db.delete_service_credential(cid)
# Both profiles' dangling selections resolve to None, not an error.
assert db.resolve_profile_service_credential(2, 'spotify') is None
assert db.resolve_profile_service_credential(9, 'spotify') is None
assert db.get_service_credential(cid) is None
def test_selection_is_per_profile_and_per_service(db):
sp = db.create_service_credential('spotify', 'SP', {'client_id': 'a', 'client_secret': 'b'})
td = db.create_service_credential('tidal', 'TD', {'access_token': 't', 'refresh_token': 'r'})
db.set_profile_service_credential(1, 'spotify', sp)
db.set_profile_service_credential(1, 'tidal', td)
assert db.resolve_profile_service_credential(1, 'spotify')['client_id'] == 'a'
assert db.resolve_profile_service_credential(1, 'tidal')['access_token'] == 't'
# A different profile shares the pool but has its own (empty) selection.
assert db.resolve_profile_service_credential(2, 'spotify') is None
Loading…
Cancel
Save