mirror of https://github.com/Nezreka/SoulSync.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
102 lines
3.2 KiB
102 lines
3.2 KiB
"""
|
|
API key authentication for the SoulSync public API.
|
|
"""
|
|
|
|
import hashlib
|
|
import secrets
|
|
import threading
|
|
import uuid
|
|
from datetime import datetime, timedelta, timezone
|
|
from functools import wraps
|
|
|
|
from flask import request, current_app
|
|
|
|
from .helpers import api_error
|
|
|
|
|
|
# Throttle persistence of `last_used_at` so every authenticated request
|
|
# does not rewrite the full app config. Maps key_hash -> last-persisted datetime.
|
|
_USAGE_WRITE_INTERVAL = timedelta(minutes=15)
|
|
_last_persisted_usage: dict[str, datetime] = {}
|
|
_usage_lock = threading.Lock()
|
|
|
|
|
|
def _should_persist_usage(key_hash: str, now: datetime) -> bool:
|
|
"""Return True if `last_used_at` for the given key should be written to disk.
|
|
|
|
Thread-safe: tracks the last write per key hash in memory and only returns
|
|
True once per `_USAGE_WRITE_INTERVAL`.
|
|
"""
|
|
with _usage_lock:
|
|
previous = _last_persisted_usage.get(key_hash)
|
|
if previous is None or (now - previous) >= _USAGE_WRITE_INTERVAL:
|
|
_last_persisted_usage[key_hash] = now
|
|
return True
|
|
return False
|
|
|
|
|
|
def generate_api_key(label=""):
|
|
"""Generate a new API key.
|
|
|
|
Returns (raw_key, key_record). The raw key is shown to the user
|
|
exactly once; only the SHA-256 hash is persisted.
|
|
"""
|
|
raw_key = f"sk_{secrets.token_urlsafe(32)}"
|
|
key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
|
|
record = {
|
|
"id": str(uuid.uuid4()),
|
|
"label": label,
|
|
"key_hash": key_hash,
|
|
"key_prefix": raw_key[:11], # "sk_" + first 8 chars
|
|
"created_at": datetime.now(timezone.utc).isoformat(),
|
|
"last_used_at": None,
|
|
}
|
|
return raw_key, record
|
|
|
|
|
|
def _hash_key(raw_key):
|
|
return hashlib.sha256(raw_key.encode()).hexdigest()
|
|
|
|
|
|
def require_api_key(f):
|
|
"""Decorator that enforces API key authentication."""
|
|
|
|
@wraps(f)
|
|
def decorated(*args, **kwargs):
|
|
# Extract key from header or query param
|
|
api_key = None
|
|
auth_header = request.headers.get("Authorization", "")
|
|
if auth_header.startswith("Bearer "):
|
|
api_key = auth_header[7:]
|
|
if not api_key:
|
|
api_key = request.args.get("api_key")
|
|
|
|
if not api_key:
|
|
return api_error("AUTH_REQUIRED", "API key is required. "
|
|
"Pass via Authorization: Bearer <key> header "
|
|
"or ?api_key= query parameter.", 401)
|
|
|
|
config_mgr = current_app.soulsync["config_manager"]
|
|
stored_keys = config_mgr.get("api_keys", [])
|
|
key_hash = _hash_key(api_key)
|
|
|
|
matched = None
|
|
for stored in stored_keys:
|
|
if stored.get("key_hash") == key_hash:
|
|
matched = stored
|
|
break
|
|
|
|
if not matched:
|
|
return api_error("INVALID_KEY", "Invalid API key.", 403)
|
|
|
|
# Update last-used timestamp (best-effort, throttled to avoid rewriting
|
|
# the full app config on every authenticated request).
|
|
now = datetime.now(timezone.utc)
|
|
matched["last_used_at"] = now.isoformat()
|
|
if _should_persist_usage(key_hash, now):
|
|
config_mgr.set("api_keys", stored_keys)
|
|
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated
|