Fix post-processing race condition between Stream Processor and Verification Worker

Added per-context-key threading lock in _post_process_matched_download to serialize access when both the Stream    Processor and Verification Worker attempt to process the same downloaded file. Previously, both threads would race to   move the source file via shutil.move, causing FileNotFoundError for the loser. The lock ensures the first thread    completes the move before the second enters, where existing protection checks detect the file was already transferred    and return early.
pull/130/head
Broque Thomas 3 months ago
parent 268735eeff
commit 01ac392307

@ -8191,18 +8191,41 @@ def _safe_move_file(src, dst):
# Ensure parent directory exists
dst.parent.mkdir(parents=True, exist_ok=True)
# If source doesn't exist, check if it was already moved to destination
# This happens when a retry or parallel thread already transferred the file
if not src.exists():
if dst.exists():
logger.info(f"Source gone but destination exists, file already transferred: {dst.name}")
return
else:
raise FileNotFoundError(f"Source file not found and destination does not exist: {src}")
# On Windows, shutil.move fails with FileExistsError if destination exists.
# Remove it first to prevent the error chain.
# Remove it first, retrying briefly for Windows file locks (e.g. Plex scanning).
if dst.exists():
try:
dst.unlink()
except Exception:
pass # If we can't remove it, let shutil.move handle the error
for _attempt in range(3):
try:
dst.unlink()
break
except PermissionError:
if _attempt < 2:
time.sleep(1)
else:
logger.warning(f"Could not remove locked destination after 3 attempts: {dst.name}")
except Exception:
break
try:
# Try standard move first (works if same filesystem)
shutil.move(str(src), str(dst))
return
except FileNotFoundError:
# Source vanished between our exists() check and the move - another thread got it first
# If destination now exists, the other thread completed the transfer successfully
if dst.exists():
logger.info(f"Source moved by another thread, destination exists: {dst.name}")
return
raise
except (OSError, PermissionError) as e:
# If it's a cross-device link error or permission error, do manual copy
if "cross-device" in str(e).lower() or "operation not permitted" in str(e).lower():
@ -8234,6 +8257,18 @@ def _post_process_matched_download(context_key, context, file_path):
Also handles simple downloads (from search page "Download" button) which
just move files to /Transfer without metadata enhancement.
"""
# --- PER-FILE LOCK ---
# Acquire a per-context-key lock so only one thread processes a given file at a time.
# The Stream Processor and Verification Worker both call this function for the same file.
# Without serialization, they race to move the source file and the loser gets FileNotFoundError.
# With the lock, the second thread waits, then the existing protection checks detect
# "source gone + destination exists" and return early.
with _post_process_locks_lock:
if context_key not in _post_process_locks:
_post_process_locks[context_key] = threading.Lock()
file_lock = _post_process_locks[context_key]
file_lock.acquire()
try:
import os
import shutil
@ -8539,13 +8574,16 @@ def _post_process_matched_download(context_key, context, file_path):
print(f"🚚 Moving '{os.path.basename(file_path)}' to '{final_path}'")
if os.path.exists(final_path):
# PROTECTION: Check if existing file already has metadata enhancement
# This prevents race conditions where later downloads overwrite properly processed files
# PROTECTION: If destination already exists, check before overwriting
# If the source file is gone, another thread already handled this - don't delete the destination
if not os.path.exists(file_path):
print(f"✅ [Protection] Destination exists and source already gone - file already transferred: {os.path.basename(final_path)}")
return
try:
from mutagen import File as MutagenFile
existing_file = MutagenFile(final_path)
has_metadata = existing_file is not None and len(existing_file.tags or {}) > 2 # More than basic tags
if has_metadata:
print(f"⚠️ [Protection] Existing file already has metadata enhancement - skipping overwrite: {os.path.basename(final_path)}")
print(f"🗑️ [Protection] Removing redundant download file: {os.path.basename(file_path)}")
@ -8570,6 +8608,22 @@ def _post_process_matched_download(context_key, context, file_path):
except Exception as e:
print(f"⚠️ [Protection] Failed to remove existing file for overwrite: {e}")
# --- PRE-MOVE SOURCE CHECK ---
# Right before moving, verify the source file still exists.
# Another thread (Stream Processor or Verification Worker) may have
# already moved this file during the sleep + metadata enhancement window.
if not os.path.exists(file_path):
if os.path.exists(final_path):
print(f"✅ [Pre-Move] Source already gone and destination exists - another thread completed transfer: {os.path.basename(final_path)}")
# Still do cover art + lyrics since the other thread might not have finished those
_download_cover_art(album_info, os.path.dirname(final_path))
_generate_lrc_file(final_path, context, spotify_artist, album_info)
return
else:
# Source gone, destination not there either - check if dest dir has the file under a slight name variation
print(f"⚠️ [Pre-Move] Source file gone and destination not found: {os.path.basename(file_path)}")
raise FileNotFoundError(f"Source file vanished before move and destination does not exist: {file_path}")
_safe_move_file(file_path, final_path)
_download_cover_art(album_info, os.path.dirname(final_path))
@ -8610,20 +8664,38 @@ def _post_process_matched_download(context_key, context, file_path):
print(f"\n❌ CRITICAL ERROR in post-processing for {context_key}: {e}")
traceback.print_exc()
# Remove from processed set so it can be retried
if context_key in _processed_download_ids:
_processed_download_ids.remove(context_key)
print(f"🔄 Removed {context_key} from processed set - will retry on next check")
# Re-add to matched context for retry
with matched_context_lock:
if context_key not in matched_downloads_context:
matched_downloads_context[context_key] = context
print(f"♻️ Re-added {context_key} to context for retry")
# Only retry if the source file still exists - otherwise retrying is pointless
# and creates an infinite loop of failures
import os as _os
source_exists = _os.path.exists(file_path) if file_path else False
if source_exists:
# Remove from processed set so it can be retried
if context_key in _processed_download_ids:
_processed_download_ids.remove(context_key)
print(f"🔄 Removed {context_key} from processed set - will retry on next check")
# Re-add to matched context for retry
with matched_context_lock:
if context_key not in matched_downloads_context:
matched_downloads_context[context_key] = context
print(f"♻️ Re-added {context_key} to context for retry")
else:
print(f"⚠️ Source file gone, not retrying: {context_key}")
finally:
file_lock.release()
# Clean up the lock entry to prevent unbounded memory growth
with _post_process_locks_lock:
_post_process_locks.pop(context_key, None)
# Keep track of processed downloads to avoid re-processing
_processed_download_ids = set()
# Per-context-key locks to prevent two threads from processing the same file simultaneously.
# Without this, the Stream Processor and Verification Worker race to move the same source file,
# and the loser gets a FileNotFoundError because the winner already moved it.
_post_process_locks = {} # {context_key: threading.Lock()}
_post_process_locks_lock = threading.Lock() # protects the dict itself
# --- File Discovery Retry System ---
# Prevents race condition where slskd reports completion before file is written to disk
# Tracks retry attempts per download to give files time to finish writing

Loading…
Cancel
Save