You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
SoulSync/api/cache.py

207 lines
7.4 KiB

"""
Cache endpoints — browse MusicBrainz and discovery match caches.
"""
import json
from flask import request
from database.music_database import get_database
from .auth import require_api_key
from .helpers import api_success, api_error, parse_pagination, build_pagination
def register_routes(bp):
# ── MusicBrainz Cache ──────────────────────────────────────
@bp.route("/cache/musicbrainz", methods=["GET"])
@require_api_key
def list_musicbrainz_cache():
"""List cached MusicBrainz lookups.
Query params:
entity_type: Filter by type ('artist', 'album', 'track')
search: Filter by entity_name
page: Page number
limit: Items per page
"""
page, limit = parse_pagination(request)
entity_type = request.args.get("entity_type")
search = request.args.get("search", "").strip()
try:
db = get_database()
conn = db._get_connection()
cursor = conn.cursor()
where_parts = []
params = []
if entity_type:
where_parts.append("entity_type = ?")
params.append(entity_type)
if search:
where_parts.append("LOWER(entity_name) LIKE LOWER(?)")
params.append(f"%{search}%")
where_clause = f"WHERE {' AND '.join(where_parts)}" if where_parts else ""
cursor.execute(f"SELECT COUNT(*) as cnt FROM musicbrainz_cache {where_clause}", params)
total = cursor.fetchone()["cnt"]
offset = (page - 1) * limit
cursor.execute(f"""
SELECT * FROM musicbrainz_cache
{where_clause}
ORDER BY last_updated DESC
LIMIT ? OFFSET ?
""", params + [limit, offset])
entries = []
for row in cursor.fetchall():
entry = dict(row)
if entry.get("metadata_json") and isinstance(entry["metadata_json"], str):
try:
entry["metadata_json"] = json.loads(entry["metadata_json"])
except (json.JSONDecodeError, TypeError):
pass
entries.append(entry)
return api_success(
{"entries": entries},
pagination=build_pagination(page, limit, total),
)
except Exception as e:
return api_error("CACHE_ERROR", str(e), 500)
@bp.route("/cache/musicbrainz/stats", methods=["GET"])
@require_api_key
def musicbrainz_cache_stats():
"""Get MusicBrainz cache statistics."""
try:
db = get_database()
conn = db._get_connection()
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) as total FROM musicbrainz_cache")
total = cursor.fetchone()["total"]
cursor.execute("""
SELECT entity_type, COUNT(*) as count
FROM musicbrainz_cache
GROUP BY entity_type
ORDER BY count DESC
""")
by_type = {row["entity_type"]: row["count"] for row in cursor.fetchall()}
cursor.execute("""
SELECT COUNT(*) as matched FROM musicbrainz_cache
WHERE musicbrainz_id IS NOT NULL
""")
matched = cursor.fetchone()["matched"]
return api_success({
"total": total,
"matched": matched,
"unmatched": total - matched,
"by_type": by_type,
})
except Exception as e:
return api_error("CACHE_ERROR", str(e), 500)
# ── Discovery Match Cache ──────────────────────────────────
@bp.route("/cache/discovery-matches", methods=["GET"])
@require_api_key
def list_discovery_match_cache():
"""List cached discovery provider matches.
Query params:
provider: Filter by provider ('spotify', 'itunes', etc.)
search: Filter by title or artist
page: Page number
limit: Items per page
"""
page, limit = parse_pagination(request)
provider = request.args.get("provider")
search = request.args.get("search", "").strip()
try:
db = get_database()
conn = db._get_connection()
cursor = conn.cursor()
where_parts = []
params = []
if provider:
where_parts.append("provider = ?")
params.append(provider)
if search:
where_parts.append("(LOWER(original_title) LIKE LOWER(?) OR LOWER(original_artist) LIKE LOWER(?))")
params.extend([f"%{search}%", f"%{search}%"])
where_clause = f"WHERE {' AND '.join(where_parts)}" if where_parts else ""
cursor.execute(f"SELECT COUNT(*) as cnt FROM discovery_match_cache {where_clause}", params)
total = cursor.fetchone()["cnt"]
offset = (page - 1) * limit
cursor.execute(f"""
SELECT * FROM discovery_match_cache
{where_clause}
ORDER BY last_used_at DESC
LIMIT ? OFFSET ?
""", params + [limit, offset])
entries = []
for row in cursor.fetchall():
entry = dict(row)
if entry.get("matched_data_json") and isinstance(entry["matched_data_json"], str):
try:
entry["matched_data_json"] = json.loads(entry["matched_data_json"])
except (json.JSONDecodeError, TypeError):
pass
entries.append(entry)
return api_success(
{"entries": entries},
pagination=build_pagination(page, limit, total),
)
except Exception as e:
return api_error("CACHE_ERROR", str(e), 500)
@bp.route("/cache/discovery-matches/stats", methods=["GET"])
@require_api_key
def discovery_match_cache_stats():
"""Get discovery match cache statistics."""
try:
db = get_database()
conn = db._get_connection()
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) as total FROM discovery_match_cache")
total = cursor.fetchone()["total"]
cursor.execute("""
SELECT provider, COUNT(*) as count
FROM discovery_match_cache
GROUP BY provider
ORDER BY count DESC
""")
by_provider = {row["provider"]: row["count"] for row in cursor.fetchall()}
cursor.execute("SELECT SUM(use_count) as total_uses FROM discovery_match_cache")
total_uses = cursor.fetchone()["total_uses"] or 0
cursor.execute("SELECT AVG(match_confidence) as avg_confidence FROM discovery_match_cache")
avg_confidence = cursor.fetchone()["avg_confidence"]
return api_success({
"total": total,
"total_uses": total_uses,
"avg_confidence": round(avg_confidence, 3) if avg_confidence else None,
"by_provider": by_provider,
})
except Exception as e:
return api_error("CACHE_ERROR", str(e), 500)