From b9af4ef4efa6f4a67113d7c26c27f0e452af80dc Mon Sep 17 00:00:00 2001 From: Broque Thomas <26755000+Nezreka@users.noreply.github.com> Date: Thu, 21 May 2026 17:50:30 -0700 Subject: [PATCH] Handle transient SQLite IO during maintenance Keep full refresh moving when post-clear VACUUM hits a transient disk I/O error, and retry clear_server_data once when the clear step itself sees the same transient SQLite failure. Retry metadata cache maintenance writes once on transient disk I/O errors so first-attempt cache jobs do not fail when an immediate retry would succeed. Tests cover best-effort VACUUM, clear retry behavior, and cache maintenance retry behavior. --- core/metadata/cache.py | 240 +++++++++--------- database/music_database.py | 87 ++++--- .../metadata/test_cache_maintenance_retry.py | 44 ++++ tests/test_database_io_resilience.py | 98 +++++++ 4 files changed, 321 insertions(+), 148 deletions(-) create mode 100644 tests/metadata/test_cache_maintenance_retry.py create mode 100644 tests/test_database_io_resilience.py diff --git a/core/metadata/cache.py b/core/metadata/cache.py index 2974c5eb..b49daae5 100644 --- a/core/metadata/cache.py +++ b/core/metadata/cache.py @@ -8,6 +8,7 @@ Transparent to callers: check cache before API call, store after success. import json import logging import threading +import time from datetime import datetime from typing import Optional, Dict, List, Tuple @@ -49,6 +50,33 @@ class MetadataCache: from database.music_database import get_database return get_database() + @staticmethod + def _is_transient_sqlite_io_error(exc: Exception) -> bool: + return 'disk i/o error' in str(exc).lower() + + def _run_maintenance_write(self, label: str, operation, default: int = 0) -> int: + """Run a maintenance write with one retry for transient SQLite I/O.""" + for attempt in range(2): + try: + db = self._get_db() + conn = db._get_connection() + try: + return operation(conn) + finally: + conn.close() + except Exception as e: + if self._is_transient_sqlite_io_error(e) and attempt == 0: + logger.warning( + "%s hit transient SQLite disk I/O error; retrying once: %s", + label, + e, + ) + time.sleep(0.25) + continue + logger.error("%s error: %s", label, e) + return default + return default + # ─── Entity Methods ─────────────────────────────────────────────── def get_entity(self, source: str, entity_type: str, entity_id: str) -> Optional[dict]: @@ -566,137 +594,113 @@ class MetadataCache: def evict_expired(self) -> int: """Delete entries that have exceeded their TTL. Returns count of evicted entries.""" - try: - db = self._get_db() - conn = db._get_connection() - try: - cursor = conn.cursor() - - # Entities - cursor.execute(""" - DELETE FROM metadata_cache_entities - WHERE julianday('now') - julianday(updated_at) > ttl_days - """) - entity_count = cursor.rowcount - - # Searches - cursor.execute(""" - DELETE FROM metadata_cache_searches - WHERE julianday('now') - julianday(created_at) > ttl_days - """) - search_count = cursor.rowcount - - conn.commit() - total = entity_count + search_count - if total > 0: - logger.info(f"Evicted {total} expired cache entries ({entity_count} entities, {search_count} searches)") - return total - finally: - conn.close() - except Exception as e: - logger.error(f"Cache eviction error: {e}") - return 0 + def _operation(conn): + cursor = conn.cursor() + + # Entities + cursor.execute(""" + DELETE FROM metadata_cache_entities + WHERE julianday('now') - julianday(updated_at) > ttl_days + """) + entity_count = cursor.rowcount + + # Searches + cursor.execute(""" + DELETE FROM metadata_cache_searches + WHERE julianday('now') - julianday(created_at) > ttl_days + """) + search_count = cursor.rowcount + + conn.commit() + total = entity_count + search_count + if total > 0: + logger.info(f"Evicted {total} expired cache entries ({entity_count} entities, {search_count} searches)") + return total + + return self._run_maintenance_write("Cache eviction", _operation) def clean_junk_entities(self) -> int: """Delete cached entities with empty/placeholder names.""" - try: - db = self._get_db() - conn = db._get_connection() - try: - cursor = conn.cursor() - junk_names = "', '".join(self._JUNK_NAMES - {''}) # exclude empty, handled separately - cursor.execute(f""" - DELETE FROM metadata_cache_entities - WHERE (name IS NULL - OR TRIM(name) = '' - OR LOWER(TRIM(name)) IN ('{junk_names}')) - AND entity_id NOT LIKE '%\\_features' ESCAPE '\\' - AND entity_id NOT LIKE '%\\_tracks' ESCAPE '\\' - """) - count = cursor.rowcount - conn.commit() - if count > 0: - logger.info(f"Cleaned {count} junk entities from cache") - return count - finally: - conn.close() - except Exception as e: - logger.error(f"Junk cleanup error: {e}") - return 0 + def _operation(conn): + cursor = conn.cursor() + junk_names = "', '".join(self._JUNK_NAMES - {''}) # exclude empty, handled separately + cursor.execute(f""" + DELETE FROM metadata_cache_entities + WHERE (name IS NULL + OR TRIM(name) = '' + OR LOWER(TRIM(name)) IN ('{junk_names}')) + AND entity_id NOT LIKE '%\\_features' ESCAPE '\\' + AND entity_id NOT LIKE '%\\_tracks' ESCAPE '\\' + """) + count = cursor.rowcount + conn.commit() + if count > 0: + logger.info(f"Cleaned {count} junk entities from cache") + return count + + return self._run_maintenance_write("Junk cleanup", _operation) def clean_orphaned_searches(self) -> int: """Delete search results where <50% of referenced entities still exist.""" - try: - db = self._get_db() - conn = db._get_connection() - try: - cursor = conn.cursor() - cursor.execute("SELECT id, source, search_type, result_ids FROM metadata_cache_searches") - rows = cursor.fetchall() + def _operation(conn): + cursor = conn.cursor() + cursor.execute("SELECT id, source, search_type, result_ids FROM metadata_cache_searches") + rows = cursor.fetchall() - dead_ids = [] - for row in rows: - try: - result_ids = json.loads(row['result_ids'] or '[]') - except (json.JSONDecodeError, TypeError): - dead_ids.append(row['id']) - continue + dead_ids = [] + for row in rows: + try: + result_ids = json.loads(row['result_ids'] or '[]') + except (json.JSONDecodeError, TypeError): + dead_ids.append(row['id']) + continue - if not result_ids: - dead_ids.append(row['id']) - continue + if not result_ids: + dead_ids.append(row['id']) + continue - # Check how many referenced entities still exist - placeholders = ','.join('?' * len(result_ids)) - cursor.execute(f""" - SELECT COUNT(*) FROM metadata_cache_entities - WHERE source = ? AND entity_type = ? AND entity_id IN ({placeholders}) - """, [row['source'], row['search_type']] + list(result_ids)) - found = cursor.fetchone()[0] - - if found < len(result_ids) * 0.5: - dead_ids.append(row['id']) - - if dead_ids: - # Delete in chunks to stay under SQLite variable limit - for i in range(0, len(dead_ids), 400): - chunk = dead_ids[i:i + 400] - placeholders = ','.join('?' * len(chunk)) - cursor.execute(f"DELETE FROM metadata_cache_searches WHERE id IN ({placeholders})", chunk) - conn.commit() + # Check how many referenced entities still exist + placeholders = ','.join('?' * len(result_ids)) + cursor.execute(f""" + SELECT COUNT(*) FROM metadata_cache_entities + WHERE source = ? AND entity_type = ? AND entity_id IN ({placeholders}) + """, [row['source'], row['search_type']] + list(result_ids)) + found = cursor.fetchone()[0] + + if found < len(result_ids) * 0.5: + dead_ids.append(row['id']) + + if dead_ids: + # Delete in chunks to stay under SQLite variable limit + for i in range(0, len(dead_ids), 400): + chunk = dead_ids[i:i + 400] + placeholders = ','.join('?' * len(chunk)) + cursor.execute(f"DELETE FROM metadata_cache_searches WHERE id IN ({placeholders})", chunk) + conn.commit() - count = len(dead_ids) - if count > 0: - logger.info(f"Cleaned {count} orphaned search results from cache") - return count - finally: - conn.close() - except Exception as e: - logger.error(f"Orphan search cleanup error: {e}") - return 0 + count = len(dead_ids) + if count > 0: + logger.info(f"Cleaned {count} orphaned search results from cache") + return count + + return self._run_maintenance_write("Orphan search cleanup", _operation) def clean_stale_musicbrainz_nulls(self, max_age_days: int = 30) -> int: """Delete MusicBrainz cache entries where lookup found nothing (null MBID) and age > max_age_days.""" - try: - db = self._get_db() - conn = db._get_connection() - try: - cursor = conn.cursor() - cursor.execute(""" - DELETE FROM musicbrainz_cache - WHERE musicbrainz_id IS NULL - AND julianday('now') - julianday(last_updated) > ? - """, (max_age_days,)) - count = cursor.rowcount - conn.commit() - if count > 0: - logger.info(f"Cleaned {count} stale MusicBrainz null entries (>{max_age_days} days)") - return count - finally: - conn.close() - except Exception as e: - logger.error(f"MusicBrainz null cleanup error: {e}") - return 0 + def _operation(conn): + cursor = conn.cursor() + cursor.execute(""" + DELETE FROM musicbrainz_cache + WHERE musicbrainz_id IS NULL + AND julianday('now') - julianday(last_updated) > ? + """, (max_age_days,)) + count = cursor.rowcount + conn.commit() + if count > 0: + logger.info(f"Cleaned {count} stale MusicBrainz null entries (>{max_age_days} days)") + return count + + return self._run_maintenance_write("MusicBrainz null cleanup", _operation) def get_health_stats(self) -> dict: """Return cache health statistics for the repair dashboard. diff --git a/database/music_database.py b/database/music_database.py index c6980623..0171d88e 100644 --- a/database/music_database.py +++ b/database/music_database.py @@ -4591,45 +4591,72 @@ class MusicDatabase: # VACUUM to actually shrink the database file and reclaim disk space logger.info("Vacuuming database to reclaim disk space...") - cursor.execute("VACUUM") + self._vacuum_best_effort(cursor) logger.info("All database data cleared and file compacted") except Exception as e: logger.error(f"Error clearing database: {e}") raise + + def _vacuum_best_effort(self, cursor): + """Run VACUUM without making the caller fail if compaction hiccups.""" + try: + cursor.execute("VACUUM") + except Exception as e: + logger.warning( + "Database VACUUM failed after data was already cleared; continuing without compaction: %s", + e, + ) + + @staticmethod + def _is_transient_sqlite_io_error(exc: Exception) -> bool: + return "disk i/o error" in str(exc).lower() def clear_server_data(self, server_source: str): """Clear data for specific server only (server-aware full refresh)""" - try: - with self._get_connection() as conn: - cursor = conn.cursor() - - # Delete only data from the specified server - # Order matters: tracks -> albums -> artists (foreign key constraints) - cursor.execute("DELETE FROM tracks WHERE server_source = ?", (server_source,)) - tracks_deleted = cursor.rowcount - - cursor.execute("DELETE FROM albums WHERE server_source = ?", (server_source,)) - albums_deleted = cursor.rowcount - - cursor.execute("DELETE FROM artists WHERE server_source = ?", (server_source,)) - artists_deleted = cursor.rowcount - - conn.commit() - - # Only VACUUM if we deleted a significant amount of data - if tracks_deleted > 1000 or albums_deleted > 100: - logger.info("Vacuuming database to reclaim disk space...") - cursor.execute("VACUUM") - - logger.info(f"Cleared {server_source} data: {artists_deleted} artists, {albums_deleted} albums, {tracks_deleted} tracks") - - # Note: Watchlist and wishlist are preserved as they are server-agnostic - - except Exception as e: - logger.error(f"Error clearing {server_source} database data: {e}") - raise + for attempt in range(2): + try: + with self._get_connection() as conn: + cursor = conn.cursor() + + # Delete only data from the specified server + # Order matters: tracks -> albums -> artists (foreign key constraints) + cursor.execute("DELETE FROM tracks WHERE server_source = ?", (server_source,)) + tracks_deleted = cursor.rowcount + + cursor.execute("DELETE FROM albums WHERE server_source = ?", (server_source,)) + albums_deleted = cursor.rowcount + + cursor.execute("DELETE FROM artists WHERE server_source = ?", (server_source,)) + artists_deleted = cursor.rowcount + + conn.commit() + + # Only VACUUM if we deleted a significant amount of data + if tracks_deleted > 1000 or albums_deleted > 100: + logger.info("Vacuuming database to reclaim disk space...") + self._vacuum_best_effort(cursor) + + logger.info( + f"Cleared {server_source} data: {artists_deleted} artists, " + f"{albums_deleted} albums, {tracks_deleted} tracks" + ) + + # Note: Watchlist and wishlist are preserved as they are server-agnostic + return + + except Exception as e: + if self._is_transient_sqlite_io_error(e) and attempt == 0: + logger.warning( + "Transient disk I/O error clearing %s database data; retrying once: %s", + server_source, + e, + ) + time.sleep(0.25) + continue + logger.error(f"Error clearing {server_source} database data: {e}") + raise def cleanup_orphaned_records(self) -> Dict[str, int]: """Remove artists and albums that have no associated tracks""" diff --git a/tests/metadata/test_cache_maintenance_retry.py b/tests/metadata/test_cache_maintenance_retry.py new file mode 100644 index 00000000..dc5ee85c --- /dev/null +++ b/tests/metadata/test_cache_maintenance_retry.py @@ -0,0 +1,44 @@ +import sqlite3 +from types import SimpleNamespace + +import core.metadata.cache as cache_module +from core.metadata.cache import MetadataCache + + +def test_maintenance_write_retries_once_after_disk_io(monkeypatch): + cache = MetadataCache() + attempts = [] + + class _Conn: + def close(self): + pass + + monkeypatch.setattr(cache, "_get_db", lambda: SimpleNamespace(_get_connection=lambda: _Conn())) + monkeypatch.setattr(cache_module.time, "sleep", lambda _seconds: None) + + def _operation(_conn): + attempts.append(1) + if len(attempts) == 1: + raise sqlite3.OperationalError("disk I/O error") + return 9 + + assert cache._run_maintenance_write("Cache eviction", _operation) == 9 + assert len(attempts) == 2 + + +def test_maintenance_write_does_not_retry_non_io_errors(monkeypatch): + cache = MetadataCache() + attempts = [] + + class _Conn: + def close(self): + pass + + monkeypatch.setattr(cache, "_get_db", lambda: SimpleNamespace(_get_connection=lambda: _Conn())) + + def _operation(_conn): + attempts.append(1) + raise sqlite3.OperationalError("syntax error") + + assert cache._run_maintenance_write("Cache eviction", _operation) == 0 + assert len(attempts) == 1 diff --git a/tests/test_database_io_resilience.py b/tests/test_database_io_resilience.py new file mode 100644 index 00000000..37da1cc7 --- /dev/null +++ b/tests/test_database_io_resilience.py @@ -0,0 +1,98 @@ +import sqlite3 + +from database.music_database import MusicDatabase + + +def test_clear_server_data_does_not_fail_when_vacuum_hits_disk_io(): + db = object.__new__(MusicDatabase) + + class _Cursor: + rowcount = 0 + + def __init__(self): + self.calls = [] + + def execute(self, query, params=None): + self.calls.append((query, params)) + if query == "VACUUM": + raise sqlite3.OperationalError("disk I/O error") + if "tracks" in query: + self.rowcount = 1500 + elif "albums" in query: + self.rowcount = 200 + elif "artists" in query: + self.rowcount = 20 + + class _Conn: + def __init__(self): + self.cursor_obj = _Cursor() + self.commits = 0 + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def cursor(self): + return self.cursor_obj + + def commit(self): + self.commits += 1 + + conn = _Conn() + db._get_connection = lambda: conn + + db.clear_server_data("jellyfin") + + assert conn.commits == 1 + assert any(call[0] == "VACUUM" for call in conn.cursor_obj.calls) + + +def test_clear_server_data_retries_transient_disk_io_before_commit(monkeypatch): + db = object.__new__(MusicDatabase) + connections = [] + + class _Cursor: + rowcount = 0 + + def __init__(self, fail_first_delete=False): + self.fail_first_delete = fail_first_delete + self.calls = [] + + def execute(self, query, params=None): + self.calls.append((query, params)) + if self.fail_first_delete and "DELETE FROM tracks" in query: + self.fail_first_delete = False + raise sqlite3.OperationalError("disk I/O error") + self.rowcount = 1 + + class _Conn: + def __init__(self, fail_first_delete=False): + self.cursor_obj = _Cursor(fail_first_delete=fail_first_delete) + self.commits = 0 + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def cursor(self): + return self.cursor_obj + + def commit(self): + self.commits += 1 + + def _connect(): + conn = _Conn(fail_first_delete=not connections) + connections.append(conn) + return conn + + db._get_connection = _connect + monkeypatch.setattr("database.music_database.time.sleep", lambda _seconds: None) + + db.clear_server_data("jellyfin") + + assert len(connections) == 2 + assert connections[1].commits == 1