diff --git a/core/automation_engine.py b/core/automation_engine.py index bf92d8d0..815a939b 100644 --- a/core/automation_engine.py +++ b/core/automation_engine.py @@ -84,6 +84,13 @@ SYSTEM_AUTOMATIONS = [ 'action_type': 'clean_completed_downloads', 'initial_delay': 300, }, + { + 'name': 'Auto-Deep Scan Library', + 'trigger_type': 'schedule', + 'trigger_config': {'interval': 7, 'unit': 'days'}, + 'action_type': 'deep_scan_library', + 'initial_delay': 900, # 15 min after startup + }, ] diff --git a/core/database_update_worker.py b/core/database_update_worker.py index dcee4cca..4b63396a 100644 --- a/core/database_update_worker.py +++ b/core/database_update_worker.py @@ -289,6 +289,144 @@ class DatabaseUpdateWorker(QThread): logger.error(f"Database update failed: {str(e)}") self._emit_signal('error', f"Database update failed: {str(e)}") + def run_deep_scan(self): + """Deep library scan: fetch ALL content, insert only NEW tracks, remove STALE tracks. + Never calls clear_server_data() — preserves all enrichment data.""" + try: + # Initialize database + self.database = get_database(self.database_path) + + logger.info(f"Starting deep library scan for {self.server_type}") + self._emit_signal('phase_changed', "Deep scan: Connecting to media server...") + + # Phase 1: Cache prep for Jellyfin/Navidrome (same as full refresh) + if self.server_type == "jellyfin": + self._emit_signal('phase_changed', "Deep scan: Preparing Jellyfin cache...") + if hasattr(self.media_client, 'set_progress_callback'): + self.media_client.set_progress_callback(lambda msg: self._emit_signal('phase_changed', msg)) + elif self.server_type == "navidrome": + self._emit_signal('phase_changed', "Deep scan: Connecting to Navidrome...") + if hasattr(self.media_client, 'set_progress_callback'): + self.media_client.set_progress_callback(lambda msg: self._emit_signal('phase_changed', msg)) + + # Fetch ALL artists from server (does NOT clear server data) + artists = self._get_all_artists() + if not artists: + self._emit_signal('error', f"Deep scan: No artists found in {self.server_type} library") + return + + logger.info(f"Deep scan: Found {len(artists)} artists in {self.server_type} library") + + # Phase 2: Process all artists — skip existing tracks, collect seen IDs + self._emit_signal('phase_changed', "Deep scan: Processing library content...") + seen_track_ids = set() + self._deep_scan_process_all_artists(artists, seen_track_ids) + + # Phase 3: Stale track removal + self._emit_signal('phase_changed', "Deep scan: Checking for stale tracks...") + db_track_ids = self.database.get_all_track_ids_for_server(self.server_type) + stale = db_track_ids - seen_track_ids + stale_removed = 0 + + if stale: + # Safety: if stale > 50% of DB count AND DB has >100 tracks, likely API failure + if len(stale) > len(db_track_ids) * 0.5 and len(db_track_ids) > 100: + logger.warning(f"Deep scan safety: {len(stale)} stale tracks ({len(stale)}/{len(db_track_ids)} = " + f"{len(stale)/len(db_track_ids)*100:.0f}%) exceeds 50% threshold — skipping removal") + else: + logger.info(f"Deep scan: Removing {len(stale)} stale tracks from database") + stale_removed = self.database.delete_stale_tracks(stale, self.server_type) + + # Phase 4: Cleanup + self._emit_signal('phase_changed', "Deep scan: Cleaning up orphaned records...") + try: + cleanup_results = self.database.cleanup_orphaned_records() + orphaned_artists = cleanup_results.get('orphaned_artists_removed', 0) + orphaned_albums = cleanup_results.get('orphaned_albums_removed', 0) + if orphaned_artists > 0 or orphaned_albums > 0: + logger.info(f"Deep scan cleanup: {orphaned_artists} orphaned artists, {orphaned_albums} orphaned albums removed") + except Exception as e: + logger.warning(f"Deep scan: Could not cleanup orphaned records: {e}") + + try: + merge_results = self.database.merge_duplicate_artists() + merged = merge_results.get('artists_merged', 0) + if merged > 0: + logger.info(f"Deep scan: Merged {merged} duplicate artists") + except Exception as e: + logger.warning(f"Deep scan: Could not merge duplicate artists: {e}") + + # Clear media client cache + if self.server_type in ["jellyfin", "navidrome"]: + try: + self.media_client.clear_cache() + logger.info(f"Deep scan: Cleared {self.server_type} cache") + except Exception as e: + logger.warning(f"Deep scan: Could not clear cache: {e}") + + # Store removal counts for the finished callback + self.removed_artists = 0 + self.removed_albums = 0 + self.removed_tracks = stale_removed + + # Phase 5: Emit finished signal + logger.info(f"Deep scan completed: {self.processed_artists} artists, " + f"{self.processed_albums} albums, {self.processed_tracks} new tracks, " + f"{stale_removed} stale tracks removed") + + self._emit_signal('finished', + self.processed_artists, + self.processed_albums, + self.processed_tracks, + self.successful_operations, + self.failed_operations + ) + + except Exception as e: + logger.error(f"Deep scan failed: {str(e)}") + self._emit_signal('error', f"Deep scan failed: {str(e)}") + + def _deep_scan_process_all_artists(self, artists: List, seen_track_ids: set): + """Process all artists sequentially for deep scan — skips existing tracks, collects seen IDs.""" + total_artists = len(artists) + logger.info(f"Deep scan: Processing {total_artists} artists (sequential, skip-existing mode)") + + for i, artist in enumerate(artists): + if self.should_stop: + break + + artist_name = getattr(artist, 'title', 'Unknown Artist') + + with self.thread_lock: + self.processed_artists += 1 + progress_percent = (self.processed_artists / total_artists) * 100 + + self._emit_signal('progress_updated', + f"Deep scan: {artist_name}", + self.processed_artists, + total_artists, + progress_percent + ) + + try: + success, details, album_count, track_count = self._process_artist_with_content( + artist, skip_existing_tracks=True, seen_track_ids=seen_track_ids + ) + + with self.thread_lock: + if success: + self.successful_operations += 1 + else: + self.failed_operations += 1 + self.processed_albums += album_count + self.processed_tracks += track_count + + self._emit_signal('artist_processed', artist_name, success, details, album_count, track_count) + + except Exception as e: + logger.error(f"Deep scan: Error processing artist {artist_name}: {e}") + self._emit_signal('artist_processed', artist_name, False, f"Error: {str(e)}", 0, 0) + def _get_all_artists(self) -> List: """Get all artists from media server library""" try: @@ -1156,74 +1294,94 @@ class DatabaseUpdateWorker(QThread): # Emit progress signal self._emit_signal('artist_processed', artist_name, success, details, album_count, track_count) - def _process_artist_with_content(self, media_artist) -> tuple[bool, str, int, int]: - """Process an artist and all their albums and tracks with optimized API usage""" + def _process_artist_with_content(self, media_artist, skip_existing_tracks=False, seen_track_ids=None) -> tuple[bool, str, int, int]: + """Process an artist and all their albums and tracks with optimized API usage. + + Args: + skip_existing_tracks: If True, skip tracks already in the DB (deep scan mode) + seen_track_ids: If provided, collect all server track IDs into this set (deep scan mode) + """ try: artist_name = getattr(media_artist, 'title', 'Unknown Artist') - + # 1. Insert/update the artist using server-agnostic method artist_success = self.database.insert_or_update_media_artist(media_artist, server_source=self.server_type) if not artist_success: return False, "Failed to update artist data", 0, 0 - + artist_id = str(media_artist.ratingKey) - + # 2. Get all albums for this artist (cached from aggressive pre-population) try: albums = list(media_artist.albums()) except Exception as e: logger.warning(f"Could not get albums for artist '{artist_name}': {e}") return True, "Artist updated (no albums accessible)", 0, 0 - + album_count = 0 track_count = 0 - + skipped_count = 0 + # 3. Process albums in smaller batches to reduce memory usage batch_size = 10 # Process 10 albums at a time for i in range(0, len(albums), batch_size): if self.should_stop: break - + album_batch = albums[i:i + batch_size] - + for album in album_batch: if self.should_stop: break - + try: # Insert/update album using server-agnostic method album_success = self.database.insert_or_update_media_album(album, artist_id, server_source=self.server_type) if album_success: album_count += 1 album_id = str(album.ratingKey) - + # 4. Process tracks in this album (cached from aggressive pre-population) try: tracks = list(album.tracks()) - + # Batch insert tracks for better database performance track_batch = [] for track in tracks: if self.should_stop: break track_batch.append((track, album_id, artist_id)) - + # Process track batch for track, alb_id, art_id in track_batch: try: + track_id_str = str(track.ratingKey) + + # Deep scan: collect all server track IDs + if seen_track_ids is not None: + seen_track_ids.add(track_id_str) + + # Deep scan: skip tracks already in DB to preserve enrichment + if skip_existing_tracks and self.database.track_exists_by_server(track_id_str, self.server_type): + skipped_count += 1 + continue + track_success = self.database.insert_or_update_media_track(track, alb_id, art_id, server_source=self.server_type) if track_success: track_count += 1 except Exception as e: logger.warning(f"Failed to process track '{getattr(track, 'title', 'Unknown')}': {e}") - + except Exception as e: logger.warning(f"Could not get tracks for album '{getattr(album, 'title', 'Unknown')}': {e}") - + except Exception as e: logger.warning(f"Failed to process album '{getattr(album, 'title', 'Unknown')}': {e}") - - details = f"Updated with {album_count} albums, {track_count} tracks" + + if skip_existing_tracks: + details = f"{album_count} albums, {track_count} new tracks ({skipped_count} existing skipped)" + else: + details = f"Updated with {album_count} albums, {track_count} tracks" return True, details, album_count, track_count except Exception as e: diff --git a/database/music_database.py b/database/music_database.py index 95d896cf..f8d00e3b 100644 --- a/database/music_database.py +++ b/database/music_database.py @@ -2419,6 +2419,49 @@ class MusicDatabase: logger.error(f"Error getting album IDs for {server_source}: {e}") return set() + def get_all_track_ids_for_server(self, server_source: str) -> set: + """Get all track IDs stored in the database for a specific server.""" + try: + with self._get_connection() as conn: + cursor = conn.cursor() + cursor.execute("SELECT id FROM tracks WHERE server_source = ?", (server_source,)) + return {row[0] for row in cursor.fetchall()} + except Exception as e: + logger.error(f"Error getting track IDs for {server_source}: {e}") + return set() + + def delete_stale_tracks(self, stale_track_ids: set, server_source: str) -> int: + """Delete tracks by ID+server_source that no longer exist on the media server. + Processes in batches of 500 for database safety.""" + try: + with self._get_connection() as conn: + cursor = conn.cursor() + batch_size = 500 + tracks_removed = 0 + + track_list = list(stale_track_ids) + for i in range(0, len(track_list), batch_size): + batch = track_list[i:i + batch_size] + placeholders = ','.join('?' * len(batch)) + params = batch + [server_source] + + cursor.execute( + f"DELETE FROM tracks WHERE id IN ({placeholders}) AND server_source = ?", + params) + tracks_removed += cursor.rowcount + + conn.commit() + + if tracks_removed > 0: + logger.info(f"Deep scan stale removal for {server_source}: " + f"{tracks_removed} tracks removed") + + return tracks_removed + + except Exception as e: + logger.error(f"Error deleting stale tracks for {server_source}: {e}") + return 0 + def delete_removed_content(self, removed_artist_ids: set, removed_album_ids: set, server_source: str): """Delete artists and albums that were removed from the media server. diff --git a/web_server.py b/web_server.py index 07916138..5ef61772 100644 --- a/web_server.py +++ b/web_server.py @@ -661,6 +661,50 @@ def _register_automation_handlers(): return {'status': 'error', 'reason': db_update_state.get('error_message', 'Unknown error'), '_manages_own_progress': True} return {'status': 'completed', 'full_refresh': str(full), '_manages_own_progress': True} + def _auto_deep_scan_library(config): + global _db_update_automation_id + automation_id = config.get('_automation_id') + if db_update_state.get('status') == 'running': + return {'status': 'skipped', 'reason': 'Database update already running'} + _db_update_automation_id = automation_id + active_server = config_manager.get_active_media_server() + with db_update_lock: + db_update_state.update({ + "status": "running", "phase": "Deep scan: Initializing...", + "progress": 0, "current_item": "", "processed": 0, "total": 0, "error_message": "" + }) + db_update_executor.submit(_run_deep_scan_task, active_server) + + # Monitor progress (callbacks handle card updates, we just block until done) + time.sleep(1) + poll_start = time.time() + last_progress_time = time.time() + last_progress_val = 0 + while time.time() - poll_start < 7200: # Max 2 hours + time.sleep(3) + with db_update_lock: + status = db_update_state.get('status', 'idle') + current_progress = db_update_state.get('progress', 0) + if status != 'running': + break + if current_progress != last_progress_val: + last_progress_val = current_progress + last_progress_time = time.time() + elif time.time() - last_progress_time > 600: + _update_automation_progress(automation_id, + log_line='Deep scan appears stalled — waiting...', log_type='warning') + last_progress_time = time.time() + else: + _update_automation_progress(automation_id, status='error', + phase='Timed out', log_line='Deep scan timed out after 2 hours', log_type='error') + return {'status': 'error', 'reason': 'Timed out', '_manages_own_progress': True} + + with db_update_lock: + final_status = db_update_state.get('status', 'unknown') + if final_status == 'error': + return {'status': 'error', 'reason': db_update_state.get('error_message', 'Unknown error'), '_manages_own_progress': True} + return {'status': 'completed', '_manages_own_progress': True} + def _auto_run_duplicate_cleaner(config): automation_id = config.get('_automation_id') if duplicate_cleaner_state.get('status') == 'running': @@ -933,6 +977,8 @@ def _register_automation_handlers(): automation_engine.register_action_handler('start_database_update', _auto_start_database_update, lambda: db_update_state.get('status') == 'running') + automation_engine.register_action_handler('deep_scan_library', _auto_deep_scan_library, + lambda: db_update_state.get('status') == 'running') automation_engine.register_action_handler('run_duplicate_cleaner', _auto_run_duplicate_cleaner, lambda: duplicate_cleaner_state.get('status') == 'running') automation_engine.register_action_handler('clear_quarantine', _auto_clear_quarantine) @@ -4417,6 +4463,8 @@ def get_automation_blocks(): "description": "Remove old searches from Soulseek", "available": True}, {"type": "clean_completed_downloads", "label": "Clean Completed Downloads", "icon": "check-square", "description": "Clear completed downloads and empty directories", "available": True}, + {"type": "deep_scan_library", "label": "Deep Scan Library", "icon": "search", + "description": "Full library comparison without losing enrichment data", "available": True}, ], "notifications": [ {"type": "discord_webhook", "label": "Discord Webhook", "icon": "message", "description": "Send a Discord notification", "available": True, @@ -14229,6 +14277,46 @@ def _run_db_update_task(full_refresh, server_type): db_update_worker.run() +def _run_deep_scan_task(server_type): + """Run a deep library scan in the background thread.""" + global db_update_worker + media_client = None + + if server_type == "plex": + media_client = plex_client + elif server_type == "jellyfin": + media_client = jellyfin_client + elif server_type == "navidrome": + media_client = navidrome_client + + if not media_client: + _db_update_error_callback(f"Media client for '{server_type}' not available.") + return + + with db_update_lock: + db_update_worker = DatabaseUpdateWorker( + media_client=media_client, + full_refresh=False, + server_type=server_type, + force_sequential=True + ) + try: + db_update_worker.progress_updated.connect(_db_update_progress_callback) + db_update_worker.phase_changed.connect(_db_update_phase_callback) + db_update_worker.artist_processed.connect(_db_update_artist_callback) + db_update_worker.finished.connect(_db_update_finished_callback) + db_update_worker.error.connect(_db_update_error_callback) + except AttributeError: + db_update_worker.connect_callback('progress_updated', _db_update_progress_callback) + db_update_worker.connect_callback('phase_changed', _db_update_phase_callback) + db_update_worker.connect_callback('artist_processed', _db_update_artist_callback) + db_update_worker.connect_callback('finished', _db_update_finished_callback) + db_update_worker.connect_callback('error', _db_update_error_callback) + + # Run deep scan instead of normal run() + db_update_worker.run_deep_scan() + + @app.route('/api/database/stats', methods=['GET']) def get_database_stats(): """Endpoint to get current database statistics."""