Handle transient SQLite IO during maintenance

Keep full refresh moving when post-clear VACUUM hits a transient disk I/O error, and retry clear_server_data once when the clear step itself sees the same transient SQLite failure.

Retry metadata cache maintenance writes once on transient disk I/O errors so first-attempt cache jobs do not fail when an immediate retry would succeed.

Tests cover best-effort VACUUM, clear retry behavior, and cache maintenance retry behavior.
pull/673/head
Broque Thomas 4 days ago
parent f1d4f78e0e
commit b9af4ef4ef

@ -8,6 +8,7 @@ Transparent to callers: check cache before API call, store after success.
import json
import logging
import threading
import time
from datetime import datetime
from typing import Optional, Dict, List, Tuple
@ -49,6 +50,33 @@ class MetadataCache:
from database.music_database import get_database
return get_database()
@staticmethod
def _is_transient_sqlite_io_error(exc: Exception) -> bool:
return 'disk i/o error' in str(exc).lower()
def _run_maintenance_write(self, label: str, operation, default: int = 0) -> int:
"""Run a maintenance write with one retry for transient SQLite I/O."""
for attempt in range(2):
try:
db = self._get_db()
conn = db._get_connection()
try:
return operation(conn)
finally:
conn.close()
except Exception as e:
if self._is_transient_sqlite_io_error(e) and attempt == 0:
logger.warning(
"%s hit transient SQLite disk I/O error; retrying once: %s",
label,
e,
)
time.sleep(0.25)
continue
logger.error("%s error: %s", label, e)
return default
return default
# ─── Entity Methods ───────────────────────────────────────────────
def get_entity(self, source: str, entity_type: str, entity_id: str) -> Optional[dict]:
@ -566,137 +594,113 @@ class MetadataCache:
def evict_expired(self) -> int:
"""Delete entries that have exceeded their TTL. Returns count of evicted entries."""
try:
db = self._get_db()
conn = db._get_connection()
try:
cursor = conn.cursor()
# Entities
cursor.execute("""
DELETE FROM metadata_cache_entities
WHERE julianday('now') - julianday(updated_at) > ttl_days
""")
entity_count = cursor.rowcount
# Searches
cursor.execute("""
DELETE FROM metadata_cache_searches
WHERE julianday('now') - julianday(created_at) > ttl_days
""")
search_count = cursor.rowcount
conn.commit()
total = entity_count + search_count
if total > 0:
logger.info(f"Evicted {total} expired cache entries ({entity_count} entities, {search_count} searches)")
return total
finally:
conn.close()
except Exception as e:
logger.error(f"Cache eviction error: {e}")
return 0
def _operation(conn):
cursor = conn.cursor()
# Entities
cursor.execute("""
DELETE FROM metadata_cache_entities
WHERE julianday('now') - julianday(updated_at) > ttl_days
""")
entity_count = cursor.rowcount
# Searches
cursor.execute("""
DELETE FROM metadata_cache_searches
WHERE julianday('now') - julianday(created_at) > ttl_days
""")
search_count = cursor.rowcount
conn.commit()
total = entity_count + search_count
if total > 0:
logger.info(f"Evicted {total} expired cache entries ({entity_count} entities, {search_count} searches)")
return total
return self._run_maintenance_write("Cache eviction", _operation)
def clean_junk_entities(self) -> int:
"""Delete cached entities with empty/placeholder names."""
try:
db = self._get_db()
conn = db._get_connection()
try:
cursor = conn.cursor()
junk_names = "', '".join(self._JUNK_NAMES - {''}) # exclude empty, handled separately
cursor.execute(f"""
DELETE FROM metadata_cache_entities
WHERE (name IS NULL
OR TRIM(name) = ''
OR LOWER(TRIM(name)) IN ('{junk_names}'))
AND entity_id NOT LIKE '%\\_features' ESCAPE '\\'
AND entity_id NOT LIKE '%\\_tracks' ESCAPE '\\'
""")
count = cursor.rowcount
conn.commit()
if count > 0:
logger.info(f"Cleaned {count} junk entities from cache")
return count
finally:
conn.close()
except Exception as e:
logger.error(f"Junk cleanup error: {e}")
return 0
def _operation(conn):
cursor = conn.cursor()
junk_names = "', '".join(self._JUNK_NAMES - {''}) # exclude empty, handled separately
cursor.execute(f"""
DELETE FROM metadata_cache_entities
WHERE (name IS NULL
OR TRIM(name) = ''
OR LOWER(TRIM(name)) IN ('{junk_names}'))
AND entity_id NOT LIKE '%\\_features' ESCAPE '\\'
AND entity_id NOT LIKE '%\\_tracks' ESCAPE '\\'
""")
count = cursor.rowcount
conn.commit()
if count > 0:
logger.info(f"Cleaned {count} junk entities from cache")
return count
return self._run_maintenance_write("Junk cleanup", _operation)
def clean_orphaned_searches(self) -> int:
"""Delete search results where <50% of referenced entities still exist."""
try:
db = self._get_db()
conn = db._get_connection()
try:
cursor = conn.cursor()
cursor.execute("SELECT id, source, search_type, result_ids FROM metadata_cache_searches")
rows = cursor.fetchall()
def _operation(conn):
cursor = conn.cursor()
cursor.execute("SELECT id, source, search_type, result_ids FROM metadata_cache_searches")
rows = cursor.fetchall()
dead_ids = []
for row in rows:
try:
result_ids = json.loads(row['result_ids'] or '[]')
except (json.JSONDecodeError, TypeError):
dead_ids.append(row['id'])
continue
dead_ids = []
for row in rows:
try:
result_ids = json.loads(row['result_ids'] or '[]')
except (json.JSONDecodeError, TypeError):
dead_ids.append(row['id'])
continue
if not result_ids:
dead_ids.append(row['id'])
continue
if not result_ids:
dead_ids.append(row['id'])
continue
# Check how many referenced entities still exist
placeholders = ','.join('?' * len(result_ids))
cursor.execute(f"""
SELECT COUNT(*) FROM metadata_cache_entities
WHERE source = ? AND entity_type = ? AND entity_id IN ({placeholders})
""", [row['source'], row['search_type']] + list(result_ids))
found = cursor.fetchone()[0]
if found < len(result_ids) * 0.5:
dead_ids.append(row['id'])
if dead_ids:
# Delete in chunks to stay under SQLite variable limit
for i in range(0, len(dead_ids), 400):
chunk = dead_ids[i:i + 400]
placeholders = ','.join('?' * len(chunk))
cursor.execute(f"DELETE FROM metadata_cache_searches WHERE id IN ({placeholders})", chunk)
conn.commit()
# Check how many referenced entities still exist
placeholders = ','.join('?' * len(result_ids))
cursor.execute(f"""
SELECT COUNT(*) FROM metadata_cache_entities
WHERE source = ? AND entity_type = ? AND entity_id IN ({placeholders})
""", [row['source'], row['search_type']] + list(result_ids))
found = cursor.fetchone()[0]
if found < len(result_ids) * 0.5:
dead_ids.append(row['id'])
if dead_ids:
# Delete in chunks to stay under SQLite variable limit
for i in range(0, len(dead_ids), 400):
chunk = dead_ids[i:i + 400]
placeholders = ','.join('?' * len(chunk))
cursor.execute(f"DELETE FROM metadata_cache_searches WHERE id IN ({placeholders})", chunk)
conn.commit()
count = len(dead_ids)
if count > 0:
logger.info(f"Cleaned {count} orphaned search results from cache")
return count
finally:
conn.close()
except Exception as e:
logger.error(f"Orphan search cleanup error: {e}")
return 0
count = len(dead_ids)
if count > 0:
logger.info(f"Cleaned {count} orphaned search results from cache")
return count
return self._run_maintenance_write("Orphan search cleanup", _operation)
def clean_stale_musicbrainz_nulls(self, max_age_days: int = 30) -> int:
"""Delete MusicBrainz cache entries where lookup found nothing (null MBID) and age > max_age_days."""
try:
db = self._get_db()
conn = db._get_connection()
try:
cursor = conn.cursor()
cursor.execute("""
DELETE FROM musicbrainz_cache
WHERE musicbrainz_id IS NULL
AND julianday('now') - julianday(last_updated) > ?
""", (max_age_days,))
count = cursor.rowcount
conn.commit()
if count > 0:
logger.info(f"Cleaned {count} stale MusicBrainz null entries (>{max_age_days} days)")
return count
finally:
conn.close()
except Exception as e:
logger.error(f"MusicBrainz null cleanup error: {e}")
return 0
def _operation(conn):
cursor = conn.cursor()
cursor.execute("""
DELETE FROM musicbrainz_cache
WHERE musicbrainz_id IS NULL
AND julianday('now') - julianday(last_updated) > ?
""", (max_age_days,))
count = cursor.rowcount
conn.commit()
if count > 0:
logger.info(f"Cleaned {count} stale MusicBrainz null entries (>{max_age_days} days)")
return count
return self._run_maintenance_write("MusicBrainz null cleanup", _operation)
def get_health_stats(self) -> dict:
"""Return cache health statistics for the repair dashboard.

@ -4591,45 +4591,72 @@ class MusicDatabase:
# VACUUM to actually shrink the database file and reclaim disk space
logger.info("Vacuuming database to reclaim disk space...")
cursor.execute("VACUUM")
self._vacuum_best_effort(cursor)
logger.info("All database data cleared and file compacted")
except Exception as e:
logger.error(f"Error clearing database: {e}")
raise
def _vacuum_best_effort(self, cursor):
"""Run VACUUM without making the caller fail if compaction hiccups."""
try:
cursor.execute("VACUUM")
except Exception as e:
logger.warning(
"Database VACUUM failed after data was already cleared; continuing without compaction: %s",
e,
)
@staticmethod
def _is_transient_sqlite_io_error(exc: Exception) -> bool:
return "disk i/o error" in str(exc).lower()
def clear_server_data(self, server_source: str):
"""Clear data for specific server only (server-aware full refresh)"""
try:
with self._get_connection() as conn:
cursor = conn.cursor()
# Delete only data from the specified server
# Order matters: tracks -> albums -> artists (foreign key constraints)
cursor.execute("DELETE FROM tracks WHERE server_source = ?", (server_source,))
tracks_deleted = cursor.rowcount
cursor.execute("DELETE FROM albums WHERE server_source = ?", (server_source,))
albums_deleted = cursor.rowcount
cursor.execute("DELETE FROM artists WHERE server_source = ?", (server_source,))
artists_deleted = cursor.rowcount
conn.commit()
# Only VACUUM if we deleted a significant amount of data
if tracks_deleted > 1000 or albums_deleted > 100:
logger.info("Vacuuming database to reclaim disk space...")
cursor.execute("VACUUM")
logger.info(f"Cleared {server_source} data: {artists_deleted} artists, {albums_deleted} albums, {tracks_deleted} tracks")
# Note: Watchlist and wishlist are preserved as they are server-agnostic
except Exception as e:
logger.error(f"Error clearing {server_source} database data: {e}")
raise
for attempt in range(2):
try:
with self._get_connection() as conn:
cursor = conn.cursor()
# Delete only data from the specified server
# Order matters: tracks -> albums -> artists (foreign key constraints)
cursor.execute("DELETE FROM tracks WHERE server_source = ?", (server_source,))
tracks_deleted = cursor.rowcount
cursor.execute("DELETE FROM albums WHERE server_source = ?", (server_source,))
albums_deleted = cursor.rowcount
cursor.execute("DELETE FROM artists WHERE server_source = ?", (server_source,))
artists_deleted = cursor.rowcount
conn.commit()
# Only VACUUM if we deleted a significant amount of data
if tracks_deleted > 1000 or albums_deleted > 100:
logger.info("Vacuuming database to reclaim disk space...")
self._vacuum_best_effort(cursor)
logger.info(
f"Cleared {server_source} data: {artists_deleted} artists, "
f"{albums_deleted} albums, {tracks_deleted} tracks"
)
# Note: Watchlist and wishlist are preserved as they are server-agnostic
return
except Exception as e:
if self._is_transient_sqlite_io_error(e) and attempt == 0:
logger.warning(
"Transient disk I/O error clearing %s database data; retrying once: %s",
server_source,
e,
)
time.sleep(0.25)
continue
logger.error(f"Error clearing {server_source} database data: {e}")
raise
def cleanup_orphaned_records(self) -> Dict[str, int]:
"""Remove artists and albums that have no associated tracks"""

@ -0,0 +1,44 @@
import sqlite3
from types import SimpleNamespace
import core.metadata.cache as cache_module
from core.metadata.cache import MetadataCache
def test_maintenance_write_retries_once_after_disk_io(monkeypatch):
cache = MetadataCache()
attempts = []
class _Conn:
def close(self):
pass
monkeypatch.setattr(cache, "_get_db", lambda: SimpleNamespace(_get_connection=lambda: _Conn()))
monkeypatch.setattr(cache_module.time, "sleep", lambda _seconds: None)
def _operation(_conn):
attempts.append(1)
if len(attempts) == 1:
raise sqlite3.OperationalError("disk I/O error")
return 9
assert cache._run_maintenance_write("Cache eviction", _operation) == 9
assert len(attempts) == 2
def test_maintenance_write_does_not_retry_non_io_errors(monkeypatch):
cache = MetadataCache()
attempts = []
class _Conn:
def close(self):
pass
monkeypatch.setattr(cache, "_get_db", lambda: SimpleNamespace(_get_connection=lambda: _Conn()))
def _operation(_conn):
attempts.append(1)
raise sqlite3.OperationalError("syntax error")
assert cache._run_maintenance_write("Cache eviction", _operation) == 0
assert len(attempts) == 1

@ -0,0 +1,98 @@
import sqlite3
from database.music_database import MusicDatabase
def test_clear_server_data_does_not_fail_when_vacuum_hits_disk_io():
db = object.__new__(MusicDatabase)
class _Cursor:
rowcount = 0
def __init__(self):
self.calls = []
def execute(self, query, params=None):
self.calls.append((query, params))
if query == "VACUUM":
raise sqlite3.OperationalError("disk I/O error")
if "tracks" in query:
self.rowcount = 1500
elif "albums" in query:
self.rowcount = 200
elif "artists" in query:
self.rowcount = 20
class _Conn:
def __init__(self):
self.cursor_obj = _Cursor()
self.commits = 0
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def cursor(self):
return self.cursor_obj
def commit(self):
self.commits += 1
conn = _Conn()
db._get_connection = lambda: conn
db.clear_server_data("jellyfin")
assert conn.commits == 1
assert any(call[0] == "VACUUM" for call in conn.cursor_obj.calls)
def test_clear_server_data_retries_transient_disk_io_before_commit(monkeypatch):
db = object.__new__(MusicDatabase)
connections = []
class _Cursor:
rowcount = 0
def __init__(self, fail_first_delete=False):
self.fail_first_delete = fail_first_delete
self.calls = []
def execute(self, query, params=None):
self.calls.append((query, params))
if self.fail_first_delete and "DELETE FROM tracks" in query:
self.fail_first_delete = False
raise sqlite3.OperationalError("disk I/O error")
self.rowcount = 1
class _Conn:
def __init__(self, fail_first_delete=False):
self.cursor_obj = _Cursor(fail_first_delete=fail_first_delete)
self.commits = 0
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def cursor(self):
return self.cursor_obj
def commit(self):
self.commits += 1
def _connect():
conn = _Conn(fail_first_delete=not connections)
connections.append(conn)
return conn
db._get_connection = _connect
monkeypatch.setattr("database.music_database.time.sleep", lambda _seconds: None)
db.clear_server_data("jellyfin")
assert len(connections) == 2
assert connections[1].commits == 1
Loading…
Cancel
Save