Add deep library scan automation for enrichment-safe sync

pull/253/head
Broque Thomas 2 months ago
parent ddd7f2d9b5
commit 8b3b82702a

@ -84,6 +84,13 @@ SYSTEM_AUTOMATIONS = [
'action_type': 'clean_completed_downloads',
'initial_delay': 300,
},
{
'name': 'Auto-Deep Scan Library',
'trigger_type': 'schedule',
'trigger_config': {'interval': 7, 'unit': 'days'},
'action_type': 'deep_scan_library',
'initial_delay': 900, # 15 min after startup
},
]

@ -289,6 +289,144 @@ class DatabaseUpdateWorker(QThread):
logger.error(f"Database update failed: {str(e)}")
self._emit_signal('error', f"Database update failed: {str(e)}")
def run_deep_scan(self):
"""Deep library scan: fetch ALL content, insert only NEW tracks, remove STALE tracks.
Never calls clear_server_data() preserves all enrichment data."""
try:
# Initialize database
self.database = get_database(self.database_path)
logger.info(f"Starting deep library scan for {self.server_type}")
self._emit_signal('phase_changed', "Deep scan: Connecting to media server...")
# Phase 1: Cache prep for Jellyfin/Navidrome (same as full refresh)
if self.server_type == "jellyfin":
self._emit_signal('phase_changed', "Deep scan: Preparing Jellyfin cache...")
if hasattr(self.media_client, 'set_progress_callback'):
self.media_client.set_progress_callback(lambda msg: self._emit_signal('phase_changed', msg))
elif self.server_type == "navidrome":
self._emit_signal('phase_changed', "Deep scan: Connecting to Navidrome...")
if hasattr(self.media_client, 'set_progress_callback'):
self.media_client.set_progress_callback(lambda msg: self._emit_signal('phase_changed', msg))
# Fetch ALL artists from server (does NOT clear server data)
artists = self._get_all_artists()
if not artists:
self._emit_signal('error', f"Deep scan: No artists found in {self.server_type} library")
return
logger.info(f"Deep scan: Found {len(artists)} artists in {self.server_type} library")
# Phase 2: Process all artists — skip existing tracks, collect seen IDs
self._emit_signal('phase_changed', "Deep scan: Processing library content...")
seen_track_ids = set()
self._deep_scan_process_all_artists(artists, seen_track_ids)
# Phase 3: Stale track removal
self._emit_signal('phase_changed', "Deep scan: Checking for stale tracks...")
db_track_ids = self.database.get_all_track_ids_for_server(self.server_type)
stale = db_track_ids - seen_track_ids
stale_removed = 0
if stale:
# Safety: if stale > 50% of DB count AND DB has >100 tracks, likely API failure
if len(stale) > len(db_track_ids) * 0.5 and len(db_track_ids) > 100:
logger.warning(f"Deep scan safety: {len(stale)} stale tracks ({len(stale)}/{len(db_track_ids)} = "
f"{len(stale)/len(db_track_ids)*100:.0f}%) exceeds 50% threshold — skipping removal")
else:
logger.info(f"Deep scan: Removing {len(stale)} stale tracks from database")
stale_removed = self.database.delete_stale_tracks(stale, self.server_type)
# Phase 4: Cleanup
self._emit_signal('phase_changed', "Deep scan: Cleaning up orphaned records...")
try:
cleanup_results = self.database.cleanup_orphaned_records()
orphaned_artists = cleanup_results.get('orphaned_artists_removed', 0)
orphaned_albums = cleanup_results.get('orphaned_albums_removed', 0)
if orphaned_artists > 0 or orphaned_albums > 0:
logger.info(f"Deep scan cleanup: {orphaned_artists} orphaned artists, {orphaned_albums} orphaned albums removed")
except Exception as e:
logger.warning(f"Deep scan: Could not cleanup orphaned records: {e}")
try:
merge_results = self.database.merge_duplicate_artists()
merged = merge_results.get('artists_merged', 0)
if merged > 0:
logger.info(f"Deep scan: Merged {merged} duplicate artists")
except Exception as e:
logger.warning(f"Deep scan: Could not merge duplicate artists: {e}")
# Clear media client cache
if self.server_type in ["jellyfin", "navidrome"]:
try:
self.media_client.clear_cache()
logger.info(f"Deep scan: Cleared {self.server_type} cache")
except Exception as e:
logger.warning(f"Deep scan: Could not clear cache: {e}")
# Store removal counts for the finished callback
self.removed_artists = 0
self.removed_albums = 0
self.removed_tracks = stale_removed
# Phase 5: Emit finished signal
logger.info(f"Deep scan completed: {self.processed_artists} artists, "
f"{self.processed_albums} albums, {self.processed_tracks} new tracks, "
f"{stale_removed} stale tracks removed")
self._emit_signal('finished',
self.processed_artists,
self.processed_albums,
self.processed_tracks,
self.successful_operations,
self.failed_operations
)
except Exception as e:
logger.error(f"Deep scan failed: {str(e)}")
self._emit_signal('error', f"Deep scan failed: {str(e)}")
def _deep_scan_process_all_artists(self, artists: List, seen_track_ids: set):
"""Process all artists sequentially for deep scan — skips existing tracks, collects seen IDs."""
total_artists = len(artists)
logger.info(f"Deep scan: Processing {total_artists} artists (sequential, skip-existing mode)")
for i, artist in enumerate(artists):
if self.should_stop:
break
artist_name = getattr(artist, 'title', 'Unknown Artist')
with self.thread_lock:
self.processed_artists += 1
progress_percent = (self.processed_artists / total_artists) * 100
self._emit_signal('progress_updated',
f"Deep scan: {artist_name}",
self.processed_artists,
total_artists,
progress_percent
)
try:
success, details, album_count, track_count = self._process_artist_with_content(
artist, skip_existing_tracks=True, seen_track_ids=seen_track_ids
)
with self.thread_lock:
if success:
self.successful_operations += 1
else:
self.failed_operations += 1
self.processed_albums += album_count
self.processed_tracks += track_count
self._emit_signal('artist_processed', artist_name, success, details, album_count, track_count)
except Exception as e:
logger.error(f"Deep scan: Error processing artist {artist_name}: {e}")
self._emit_signal('artist_processed', artist_name, False, f"Error: {str(e)}", 0, 0)
def _get_all_artists(self) -> List:
"""Get all artists from media server library"""
try:
@ -1156,74 +1294,94 @@ class DatabaseUpdateWorker(QThread):
# Emit progress signal
self._emit_signal('artist_processed', artist_name, success, details, album_count, track_count)
def _process_artist_with_content(self, media_artist) -> tuple[bool, str, int, int]:
"""Process an artist and all their albums and tracks with optimized API usage"""
def _process_artist_with_content(self, media_artist, skip_existing_tracks=False, seen_track_ids=None) -> tuple[bool, str, int, int]:
"""Process an artist and all their albums and tracks with optimized API usage.
Args:
skip_existing_tracks: If True, skip tracks already in the DB (deep scan mode)
seen_track_ids: If provided, collect all server track IDs into this set (deep scan mode)
"""
try:
artist_name = getattr(media_artist, 'title', 'Unknown Artist')
# 1. Insert/update the artist using server-agnostic method
artist_success = self.database.insert_or_update_media_artist(media_artist, server_source=self.server_type)
if not artist_success:
return False, "Failed to update artist data", 0, 0
artist_id = str(media_artist.ratingKey)
# 2. Get all albums for this artist (cached from aggressive pre-population)
try:
albums = list(media_artist.albums())
except Exception as e:
logger.warning(f"Could not get albums for artist '{artist_name}': {e}")
return True, "Artist updated (no albums accessible)", 0, 0
album_count = 0
track_count = 0
skipped_count = 0
# 3. Process albums in smaller batches to reduce memory usage
batch_size = 10 # Process 10 albums at a time
for i in range(0, len(albums), batch_size):
if self.should_stop:
break
album_batch = albums[i:i + batch_size]
for album in album_batch:
if self.should_stop:
break
try:
# Insert/update album using server-agnostic method
album_success = self.database.insert_or_update_media_album(album, artist_id, server_source=self.server_type)
if album_success:
album_count += 1
album_id = str(album.ratingKey)
# 4. Process tracks in this album (cached from aggressive pre-population)
try:
tracks = list(album.tracks())
# Batch insert tracks for better database performance
track_batch = []
for track in tracks:
if self.should_stop:
break
track_batch.append((track, album_id, artist_id))
# Process track batch
for track, alb_id, art_id in track_batch:
try:
track_id_str = str(track.ratingKey)
# Deep scan: collect all server track IDs
if seen_track_ids is not None:
seen_track_ids.add(track_id_str)
# Deep scan: skip tracks already in DB to preserve enrichment
if skip_existing_tracks and self.database.track_exists_by_server(track_id_str, self.server_type):
skipped_count += 1
continue
track_success = self.database.insert_or_update_media_track(track, alb_id, art_id, server_source=self.server_type)
if track_success:
track_count += 1
except Exception as e:
logger.warning(f"Failed to process track '{getattr(track, 'title', 'Unknown')}': {e}")
except Exception as e:
logger.warning(f"Could not get tracks for album '{getattr(album, 'title', 'Unknown')}': {e}")
except Exception as e:
logger.warning(f"Failed to process album '{getattr(album, 'title', 'Unknown')}': {e}")
details = f"Updated with {album_count} albums, {track_count} tracks"
if skip_existing_tracks:
details = f"{album_count} albums, {track_count} new tracks ({skipped_count} existing skipped)"
else:
details = f"Updated with {album_count} albums, {track_count} tracks"
return True, details, album_count, track_count
except Exception as e:

@ -2419,6 +2419,49 @@ class MusicDatabase:
logger.error(f"Error getting album IDs for {server_source}: {e}")
return set()
def get_all_track_ids_for_server(self, server_source: str) -> set:
"""Get all track IDs stored in the database for a specific server."""
try:
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT id FROM tracks WHERE server_source = ?", (server_source,))
return {row[0] for row in cursor.fetchall()}
except Exception as e:
logger.error(f"Error getting track IDs for {server_source}: {e}")
return set()
def delete_stale_tracks(self, stale_track_ids: set, server_source: str) -> int:
"""Delete tracks by ID+server_source that no longer exist on the media server.
Processes in batches of 500 for database safety."""
try:
with self._get_connection() as conn:
cursor = conn.cursor()
batch_size = 500
tracks_removed = 0
track_list = list(stale_track_ids)
for i in range(0, len(track_list), batch_size):
batch = track_list[i:i + batch_size]
placeholders = ','.join('?' * len(batch))
params = batch + [server_source]
cursor.execute(
f"DELETE FROM tracks WHERE id IN ({placeholders}) AND server_source = ?",
params)
tracks_removed += cursor.rowcount
conn.commit()
if tracks_removed > 0:
logger.info(f"Deep scan stale removal for {server_source}: "
f"{tracks_removed} tracks removed")
return tracks_removed
except Exception as e:
logger.error(f"Error deleting stale tracks for {server_source}: {e}")
return 0
def delete_removed_content(self, removed_artist_ids: set, removed_album_ids: set,
server_source: str):
"""Delete artists and albums that were removed from the media server.

@ -661,6 +661,50 @@ def _register_automation_handlers():
return {'status': 'error', 'reason': db_update_state.get('error_message', 'Unknown error'), '_manages_own_progress': True}
return {'status': 'completed', 'full_refresh': str(full), '_manages_own_progress': True}
def _auto_deep_scan_library(config):
global _db_update_automation_id
automation_id = config.get('_automation_id')
if db_update_state.get('status') == 'running':
return {'status': 'skipped', 'reason': 'Database update already running'}
_db_update_automation_id = automation_id
active_server = config_manager.get_active_media_server()
with db_update_lock:
db_update_state.update({
"status": "running", "phase": "Deep scan: Initializing...",
"progress": 0, "current_item": "", "processed": 0, "total": 0, "error_message": ""
})
db_update_executor.submit(_run_deep_scan_task, active_server)
# Monitor progress (callbacks handle card updates, we just block until done)
time.sleep(1)
poll_start = time.time()
last_progress_time = time.time()
last_progress_val = 0
while time.time() - poll_start < 7200: # Max 2 hours
time.sleep(3)
with db_update_lock:
status = db_update_state.get('status', 'idle')
current_progress = db_update_state.get('progress', 0)
if status != 'running':
break
if current_progress != last_progress_val:
last_progress_val = current_progress
last_progress_time = time.time()
elif time.time() - last_progress_time > 600:
_update_automation_progress(automation_id,
log_line='Deep scan appears stalled — waiting...', log_type='warning')
last_progress_time = time.time()
else:
_update_automation_progress(automation_id, status='error',
phase='Timed out', log_line='Deep scan timed out after 2 hours', log_type='error')
return {'status': 'error', 'reason': 'Timed out', '_manages_own_progress': True}
with db_update_lock:
final_status = db_update_state.get('status', 'unknown')
if final_status == 'error':
return {'status': 'error', 'reason': db_update_state.get('error_message', 'Unknown error'), '_manages_own_progress': True}
return {'status': 'completed', '_manages_own_progress': True}
def _auto_run_duplicate_cleaner(config):
automation_id = config.get('_automation_id')
if duplicate_cleaner_state.get('status') == 'running':
@ -933,6 +977,8 @@ def _register_automation_handlers():
automation_engine.register_action_handler('start_database_update', _auto_start_database_update,
lambda: db_update_state.get('status') == 'running')
automation_engine.register_action_handler('deep_scan_library', _auto_deep_scan_library,
lambda: db_update_state.get('status') == 'running')
automation_engine.register_action_handler('run_duplicate_cleaner', _auto_run_duplicate_cleaner,
lambda: duplicate_cleaner_state.get('status') == 'running')
automation_engine.register_action_handler('clear_quarantine', _auto_clear_quarantine)
@ -4417,6 +4463,8 @@ def get_automation_blocks():
"description": "Remove old searches from Soulseek", "available": True},
{"type": "clean_completed_downloads", "label": "Clean Completed Downloads", "icon": "check-square",
"description": "Clear completed downloads and empty directories", "available": True},
{"type": "deep_scan_library", "label": "Deep Scan Library", "icon": "search",
"description": "Full library comparison without losing enrichment data", "available": True},
],
"notifications": [
{"type": "discord_webhook", "label": "Discord Webhook", "icon": "message", "description": "Send a Discord notification", "available": True,
@ -14229,6 +14277,46 @@ def _run_db_update_task(full_refresh, server_type):
db_update_worker.run()
def _run_deep_scan_task(server_type):
"""Run a deep library scan in the background thread."""
global db_update_worker
media_client = None
if server_type == "plex":
media_client = plex_client
elif server_type == "jellyfin":
media_client = jellyfin_client
elif server_type == "navidrome":
media_client = navidrome_client
if not media_client:
_db_update_error_callback(f"Media client for '{server_type}' not available.")
return
with db_update_lock:
db_update_worker = DatabaseUpdateWorker(
media_client=media_client,
full_refresh=False,
server_type=server_type,
force_sequential=True
)
try:
db_update_worker.progress_updated.connect(_db_update_progress_callback)
db_update_worker.phase_changed.connect(_db_update_phase_callback)
db_update_worker.artist_processed.connect(_db_update_artist_callback)
db_update_worker.finished.connect(_db_update_finished_callback)
db_update_worker.error.connect(_db_update_error_callback)
except AttributeError:
db_update_worker.connect_callback('progress_updated', _db_update_progress_callback)
db_update_worker.connect_callback('phase_changed', _db_update_phase_callback)
db_update_worker.connect_callback('artist_processed', _db_update_artist_callback)
db_update_worker.connect_callback('finished', _db_update_finished_callback)
db_update_worker.connect_callback('error', _db_update_error_callback)
# Run deep scan instead of normal run()
db_update_worker.run_deep_scan()
@app.route('/api/database/stats', methods=['GET'])
def get_database_stats():
"""Endpoint to get current database statistics."""

Loading…
Cancel
Save