diff --git a/web_server.py b/web_server.py index 761df19b..cb2f3de3 100644 --- a/web_server.py +++ b/web_server.py @@ -8191,18 +8191,41 @@ def _safe_move_file(src, dst): # Ensure parent directory exists dst.parent.mkdir(parents=True, exist_ok=True) + # If source doesn't exist, check if it was already moved to destination + # This happens when a retry or parallel thread already transferred the file + if not src.exists(): + if dst.exists(): + logger.info(f"Source gone but destination exists, file already transferred: {dst.name}") + return + else: + raise FileNotFoundError(f"Source file not found and destination does not exist: {src}") + # On Windows, shutil.move fails with FileExistsError if destination exists. - # Remove it first to prevent the error chain. + # Remove it first, retrying briefly for Windows file locks (e.g. Plex scanning). if dst.exists(): - try: - dst.unlink() - except Exception: - pass # If we can't remove it, let shutil.move handle the error + for _attempt in range(3): + try: + dst.unlink() + break + except PermissionError: + if _attempt < 2: + time.sleep(1) + else: + logger.warning(f"Could not remove locked destination after 3 attempts: {dst.name}") + except Exception: + break try: # Try standard move first (works if same filesystem) shutil.move(str(src), str(dst)) return + except FileNotFoundError: + # Source vanished between our exists() check and the move - another thread got it first + # If destination now exists, the other thread completed the transfer successfully + if dst.exists(): + logger.info(f"Source moved by another thread, destination exists: {dst.name}") + return + raise except (OSError, PermissionError) as e: # If it's a cross-device link error or permission error, do manual copy if "cross-device" in str(e).lower() or "operation not permitted" in str(e).lower(): @@ -8234,6 +8257,18 @@ def _post_process_matched_download(context_key, context, file_path): Also handles simple downloads (from search page "Download" button) which just move files to /Transfer without metadata enhancement. """ + # --- PER-FILE LOCK --- + # Acquire a per-context-key lock so only one thread processes a given file at a time. + # The Stream Processor and Verification Worker both call this function for the same file. + # Without serialization, they race to move the source file and the loser gets FileNotFoundError. + # With the lock, the second thread waits, then the existing protection checks detect + # "source gone + destination exists" and return early. + with _post_process_locks_lock: + if context_key not in _post_process_locks: + _post_process_locks[context_key] = threading.Lock() + file_lock = _post_process_locks[context_key] + + file_lock.acquire() try: import os import shutil @@ -8539,13 +8574,16 @@ def _post_process_matched_download(context_key, context, file_path): print(f"🚚 Moving '{os.path.basename(file_path)}' to '{final_path}'") if os.path.exists(final_path): - # PROTECTION: Check if existing file already has metadata enhancement - # This prevents race conditions where later downloads overwrite properly processed files + # PROTECTION: If destination already exists, check before overwriting + # If the source file is gone, another thread already handled this - don't delete the destination + if not os.path.exists(file_path): + print(f"āœ… [Protection] Destination exists and source already gone - file already transferred: {os.path.basename(final_path)}") + return try: from mutagen import File as MutagenFile existing_file = MutagenFile(final_path) has_metadata = existing_file is not None and len(existing_file.tags or {}) > 2 # More than basic tags - + if has_metadata: print(f"āš ļø [Protection] Existing file already has metadata enhancement - skipping overwrite: {os.path.basename(final_path)}") print(f"šŸ—‘ļø [Protection] Removing redundant download file: {os.path.basename(file_path)}") @@ -8570,6 +8608,22 @@ def _post_process_matched_download(context_key, context, file_path): except Exception as e: print(f"āš ļø [Protection] Failed to remove existing file for overwrite: {e}") + # --- PRE-MOVE SOURCE CHECK --- + # Right before moving, verify the source file still exists. + # Another thread (Stream Processor or Verification Worker) may have + # already moved this file during the sleep + metadata enhancement window. + if not os.path.exists(file_path): + if os.path.exists(final_path): + print(f"āœ… [Pre-Move] Source already gone and destination exists - another thread completed transfer: {os.path.basename(final_path)}") + # Still do cover art + lyrics since the other thread might not have finished those + _download_cover_art(album_info, os.path.dirname(final_path)) + _generate_lrc_file(final_path, context, spotify_artist, album_info) + return + else: + # Source gone, destination not there either - check if dest dir has the file under a slight name variation + print(f"āš ļø [Pre-Move] Source file gone and destination not found: {os.path.basename(file_path)}") + raise FileNotFoundError(f"Source file vanished before move and destination does not exist: {file_path}") + _safe_move_file(file_path, final_path) _download_cover_art(album_info, os.path.dirname(final_path)) @@ -8610,20 +8664,38 @@ def _post_process_matched_download(context_key, context, file_path): print(f"\nāŒ CRITICAL ERROR in post-processing for {context_key}: {e}") traceback.print_exc() - # Remove from processed set so it can be retried - if context_key in _processed_download_ids: - _processed_download_ids.remove(context_key) - print(f"šŸ”„ Removed {context_key} from processed set - will retry on next check") - - # Re-add to matched context for retry - with matched_context_lock: - if context_key not in matched_downloads_context: - matched_downloads_context[context_key] = context - print(f"ā™»ļø Re-added {context_key} to context for retry") + # Only retry if the source file still exists - otherwise retrying is pointless + # and creates an infinite loop of failures + import os as _os + source_exists = _os.path.exists(file_path) if file_path else False + if source_exists: + # Remove from processed set so it can be retried + if context_key in _processed_download_ids: + _processed_download_ids.remove(context_key) + print(f"šŸ”„ Removed {context_key} from processed set - will retry on next check") + + # Re-add to matched context for retry + with matched_context_lock: + if context_key not in matched_downloads_context: + matched_downloads_context[context_key] = context + print(f"ā™»ļø Re-added {context_key} to context for retry") + else: + print(f"āš ļø Source file gone, not retrying: {context_key}") + finally: + file_lock.release() + # Clean up the lock entry to prevent unbounded memory growth + with _post_process_locks_lock: + _post_process_locks.pop(context_key, None) # Keep track of processed downloads to avoid re-processing _processed_download_ids = set() +# Per-context-key locks to prevent two threads from processing the same file simultaneously. +# Without this, the Stream Processor and Verification Worker race to move the same source file, +# and the loser gets a FileNotFoundError because the winner already moved it. +_post_process_locks = {} # {context_key: threading.Lock()} +_post_process_locks_lock = threading.Lock() # protects the dict itself + # --- File Discovery Retry System --- # Prevents race condition where slskd reports completion before file is written to disk # Tracks retry attempts per download to give files time to finish writing