From 01ac3923077ef676bffe8c60cbdb719915f513f2 Mon Sep 17 00:00:00 2001 From: Broque Thomas Date: Sun, 1 Feb 2026 16:15:22 -0800 Subject: [PATCH] Fix post-processing race condition between Stream Processor and Verification Worker Added per-context-key threading lock in _post_process_matched_download to serialize access when both the Stream Processor and Verification Worker attempt to process the same downloaded file. Previously, both threads would race to move the source file via shutil.move, causing FileNotFoundError for the loser. The lock ensures the first thread completes the move before the second enters, where existing protection checks detect the file was already transferred and return early. --- web_server.py | 108 +++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 90 insertions(+), 18 deletions(-) diff --git a/web_server.py b/web_server.py index 761df19b..cb2f3de3 100644 --- a/web_server.py +++ b/web_server.py @@ -8191,18 +8191,41 @@ def _safe_move_file(src, dst): # Ensure parent directory exists dst.parent.mkdir(parents=True, exist_ok=True) + # If source doesn't exist, check if it was already moved to destination + # This happens when a retry or parallel thread already transferred the file + if not src.exists(): + if dst.exists(): + logger.info(f"Source gone but destination exists, file already transferred: {dst.name}") + return + else: + raise FileNotFoundError(f"Source file not found and destination does not exist: {src}") + # On Windows, shutil.move fails with FileExistsError if destination exists. - # Remove it first to prevent the error chain. + # Remove it first, retrying briefly for Windows file locks (e.g. Plex scanning). if dst.exists(): - try: - dst.unlink() - except Exception: - pass # If we can't remove it, let shutil.move handle the error + for _attempt in range(3): + try: + dst.unlink() + break + except PermissionError: + if _attempt < 2: + time.sleep(1) + else: + logger.warning(f"Could not remove locked destination after 3 attempts: {dst.name}") + except Exception: + break try: # Try standard move first (works if same filesystem) shutil.move(str(src), str(dst)) return + except FileNotFoundError: + # Source vanished between our exists() check and the move - another thread got it first + # If destination now exists, the other thread completed the transfer successfully + if dst.exists(): + logger.info(f"Source moved by another thread, destination exists: {dst.name}") + return + raise except (OSError, PermissionError) as e: # If it's a cross-device link error or permission error, do manual copy if "cross-device" in str(e).lower() or "operation not permitted" in str(e).lower(): @@ -8234,6 +8257,18 @@ def _post_process_matched_download(context_key, context, file_path): Also handles simple downloads (from search page "Download" button) which just move files to /Transfer without metadata enhancement. """ + # --- PER-FILE LOCK --- + # Acquire a per-context-key lock so only one thread processes a given file at a time. + # The Stream Processor and Verification Worker both call this function for the same file. + # Without serialization, they race to move the source file and the loser gets FileNotFoundError. + # With the lock, the second thread waits, then the existing protection checks detect + # "source gone + destination exists" and return early. + with _post_process_locks_lock: + if context_key not in _post_process_locks: + _post_process_locks[context_key] = threading.Lock() + file_lock = _post_process_locks[context_key] + + file_lock.acquire() try: import os import shutil @@ -8539,13 +8574,16 @@ def _post_process_matched_download(context_key, context, file_path): print(f"🚚 Moving '{os.path.basename(file_path)}' to '{final_path}'") if os.path.exists(final_path): - # PROTECTION: Check if existing file already has metadata enhancement - # This prevents race conditions where later downloads overwrite properly processed files + # PROTECTION: If destination already exists, check before overwriting + # If the source file is gone, another thread already handled this - don't delete the destination + if not os.path.exists(file_path): + print(f"āœ… [Protection] Destination exists and source already gone - file already transferred: {os.path.basename(final_path)}") + return try: from mutagen import File as MutagenFile existing_file = MutagenFile(final_path) has_metadata = existing_file is not None and len(existing_file.tags or {}) > 2 # More than basic tags - + if has_metadata: print(f"āš ļø [Protection] Existing file already has metadata enhancement - skipping overwrite: {os.path.basename(final_path)}") print(f"šŸ—‘ļø [Protection] Removing redundant download file: {os.path.basename(file_path)}") @@ -8570,6 +8608,22 @@ def _post_process_matched_download(context_key, context, file_path): except Exception as e: print(f"āš ļø [Protection] Failed to remove existing file for overwrite: {e}") + # --- PRE-MOVE SOURCE CHECK --- + # Right before moving, verify the source file still exists. + # Another thread (Stream Processor or Verification Worker) may have + # already moved this file during the sleep + metadata enhancement window. + if not os.path.exists(file_path): + if os.path.exists(final_path): + print(f"āœ… [Pre-Move] Source already gone and destination exists - another thread completed transfer: {os.path.basename(final_path)}") + # Still do cover art + lyrics since the other thread might not have finished those + _download_cover_art(album_info, os.path.dirname(final_path)) + _generate_lrc_file(final_path, context, spotify_artist, album_info) + return + else: + # Source gone, destination not there either - check if dest dir has the file under a slight name variation + print(f"āš ļø [Pre-Move] Source file gone and destination not found: {os.path.basename(file_path)}") + raise FileNotFoundError(f"Source file vanished before move and destination does not exist: {file_path}") + _safe_move_file(file_path, final_path) _download_cover_art(album_info, os.path.dirname(final_path)) @@ -8610,20 +8664,38 @@ def _post_process_matched_download(context_key, context, file_path): print(f"\nāŒ CRITICAL ERROR in post-processing for {context_key}: {e}") traceback.print_exc() - # Remove from processed set so it can be retried - if context_key in _processed_download_ids: - _processed_download_ids.remove(context_key) - print(f"šŸ”„ Removed {context_key} from processed set - will retry on next check") - - # Re-add to matched context for retry - with matched_context_lock: - if context_key not in matched_downloads_context: - matched_downloads_context[context_key] = context - print(f"ā™»ļø Re-added {context_key} to context for retry") + # Only retry if the source file still exists - otherwise retrying is pointless + # and creates an infinite loop of failures + import os as _os + source_exists = _os.path.exists(file_path) if file_path else False + if source_exists: + # Remove from processed set so it can be retried + if context_key in _processed_download_ids: + _processed_download_ids.remove(context_key) + print(f"šŸ”„ Removed {context_key} from processed set - will retry on next check") + + # Re-add to matched context for retry + with matched_context_lock: + if context_key not in matched_downloads_context: + matched_downloads_context[context_key] = context + print(f"ā™»ļø Re-added {context_key} to context for retry") + else: + print(f"āš ļø Source file gone, not retrying: {context_key}") + finally: + file_lock.release() + # Clean up the lock entry to prevent unbounded memory growth + with _post_process_locks_lock: + _post_process_locks.pop(context_key, None) # Keep track of processed downloads to avoid re-processing _processed_download_ids = set() +# Per-context-key locks to prevent two threads from processing the same file simultaneously. +# Without this, the Stream Processor and Verification Worker race to move the same source file, +# and the loser gets a FileNotFoundError because the winner already moved it. +_post_process_locks = {} # {context_key: threading.Lock()} +_post_process_locks_lock = threading.Lock() # protects the dict itself + # --- File Discovery Retry System --- # Prevents race condition where slskd reports completion before file is written to disk # Tracks retry attempts per download to give files time to finish writing