Verify bytes before marking downloads complete

Add robust checks and stabilization before treating transfers as finished. The patch verifies bytesTransferred vs expected size in multiple places (monitor, API polling, YouTube handling, and batch status building) to avoid acting on premature "Completed/Succeeded" states. It adds per-poll file-claiming to prevent two contexts from grabbing the same file, introduces file-stability loops and retries when locating/moving files, replaces a blind 1s wait with repeated size checks, and fsyncs destination files after copy. Also introduces an _incomplete_warned flag to reduce log spam and ensures post-processing is triggered reliably once files are truly stable.
pull/165/head
Broque Thomas 3 months ago
parent 1283041836
commit a8e84fa20f

@ -614,7 +614,19 @@ class WebUIDownloadMonitor:
# Must exclude error states first (matching _build_batch_status_data's prioritized checking)
has_error = ('Errored' in state or 'Failed' in state or 'Rejected' in state or 'TimedOut' in state)
has_completion = ('Completed' in state or 'Succeeded' in state)
# Verify bytes actually transferred before trusting state string.
# slskd can report "Completed" before the full file is flushed to disk,
# or on connection drops that leave a partial file.
if has_completion and not has_error:
expected_size = live_info.get('size', 0)
transferred = live_info.get('bytesTransferred', 0)
if expected_size > 0 and transferred < expected_size:
if not task.get('_incomplete_warned'):
print(f"⚠️ Monitor: {task_id} state={state} but bytes incomplete ({transferred}/{expected_size}) — waiting")
task['_incomplete_warned'] = True
continue
if has_completion and not has_error and task['status'] == 'downloading':
task.pop('_incomplete_warned', None)
# CRITICAL FIX: Transition to 'post_processing' HERE so downloads
# don't depend on browser polling to trigger post-processing.
# Previously, post-processing was only submitted by _build_batch_status_data
@ -1380,7 +1392,11 @@ def _prepare_stream_task(track_data):
# Track queue state timing (matching GUI logic)
is_queued = ('queued' in download_state or 'initializing' in download_state)
is_downloading = ('inprogress' in download_state or 'transferring' in download_state)
is_completed = ('succeeded' in download_state or api_progress >= 100)
# Verify bytes match before trusting state/progress
_stream_expected = download_status.get('size', 0)
_stream_transferred = download_status.get('bytesTransferred', 0)
_bytes_ok = _stream_expected <= 0 or _stream_transferred >= _stream_expected
is_completed = ('succeeded' in download_state or api_progress >= 100) and _bytes_ok
# Handle queue state timing
if is_queued and queue_start_time is None:
@ -1416,11 +1432,24 @@ def _prepare_stream_task(track_data):
# Check if download is complete
if is_completed:
print(f"✓ Download completed via API status: {original_state}")
# Give file system time to sync
time.sleep(1)
# Wait for file to stabilise on disk before moving
found_file = _find_downloaded_file(download_path, track_data)
if found_file:
_prev_sz = -1
for _sc in range(4):
try:
_cur_sz = os.path.getsize(found_file)
except OSError:
_cur_sz = -1
if _cur_sz == _prev_sz and _cur_sz > 0:
break
_prev_sz = _cur_sz
time.sleep(1.5)
# Re-find in case it wasn't found on first try
if not found_file:
found_file = _find_downloaded_file(download_path, track_data)
# Retry file search a few times (matching GUI logic)
retry_attempts = 5
@ -4615,6 +4644,10 @@ def get_download_status():
# Don't return early if no Soulseek transfers - YouTube downloads need to be checked too!
all_transfers = []
completed_matched_downloads = []
# Track files already claimed this poll cycle to prevent two contexts from
# grabbing the same physical file when downloads share a basename (e.g.,
# "07 - Aurora.flac" from two different albums/artists).
_files_claimed_this_cycle = set()
if not transfers_data:
# No Soulseek transfers, but continue to check YouTube downloads below
@ -4633,6 +4666,11 @@ def get_download_status():
# Check for completion state
if ('succeeded' in state or 'completed' in state) and 'errored' not in state and 'rejected' not in state:
# Verify bytes actually transferred before trusting state
_fi_size = file_info.get('size', 0)
_fi_transferred = file_info.get('bytesTransferred', 0)
if _fi_size > 0 and _fi_transferred < _fi_size:
continue # Not truly complete yet
filename_from_api = file_info.get('filename')
if not filename_from_api: continue
@ -4698,17 +4736,23 @@ def get_download_status():
found_path = found_result[0] if found_result and found_result[0] else None
if found_path:
print(f"🎯 Found completed matched file on disk: {found_path}")
completed_matched_downloads.append((context_key, context, found_path))
# Don't add to _processed_download_ids yet - wait until thread starts successfully
# Clean up retry tracking if file was found after retries
with _download_retry_lock:
if context_key in _download_retry_attempts:
retry_count = _download_retry_attempts[context_key]['count']
elapsed = time.time() - _download_retry_attempts[context_key]['first_attempt']
print(f"✅ File found after {retry_count} retry attempt(s) ({elapsed:.1f}s): {os.path.basename(filename_from_api)}")
del _download_retry_attempts[context_key]
# Prevent two contexts from claiming the same physical file
_norm_path = os.path.normpath(found_path)
if _norm_path in _files_claimed_this_cycle:
print(f"⚠️ File already claimed by another context this cycle: {os.path.basename(found_path)} — deferring to next poll")
else:
_files_claimed_this_cycle.add(_norm_path)
print(f"🎯 Found completed matched file on disk: {found_path}")
completed_matched_downloads.append((context_key, context, found_path))
# Don't add to _processed_download_ids yet - wait until thread starts successfully
# Clean up retry tracking if file was found after retries
with _download_retry_lock:
if context_key in _download_retry_attempts:
retry_count = _download_retry_attempts[context_key]['count']
elapsed = time.time() - _download_retry_attempts[context_key]['first_attempt']
print(f"✅ File found after {retry_count} retry attempt(s) ({elapsed:.1f}s): {os.path.basename(filename_from_api)}")
del _download_retry_attempts[context_key]
else:
# File not found yet - implement retry logic instead of immediate give-up
# This fixes race condition where slskd reports completion before file is written to disk
@ -4785,7 +4829,9 @@ def get_download_status():
all_transfers.append(youtube_transfer)
# Check if YouTube download is completed and needs post-processing
if download.state and ('succeeded' in download.state.lower() or 'completed' in download.state.lower()):
# Verify bytes match before trusting state string
_yt_bytes_ok = download.size <= 0 or download.transferred >= download.size
if _yt_bytes_ok and download.state and ('succeeded' in download.state.lower() or 'completed' in download.state.lower()):
context_key = _make_context_key(download.username, download.filename)
with matched_context_lock:
@ -4797,6 +4843,12 @@ def get_download_status():
found_path = found_result[0] if found_result and found_result[0] else None
if found_path:
# Prevent two contexts from claiming the same physical file
_yt_norm = os.path.normpath(found_path)
if _yt_norm in _files_claimed_this_cycle:
print(f"⚠️ [YouTube] File already claimed this cycle: {os.path.basename(found_path)} — deferring")
continue
_files_claimed_this_cycle.add(_yt_norm)
print(f"🎯 [YouTube] Found completed matched file on disk: {found_path}")
# Start post-processing thread
def process_youtube_download():
@ -9953,6 +10005,8 @@ def _safe_move_file(src, dst):
with open(src, 'rb') as f_src:
with open(dst, 'wb') as f_dst:
shutil.copyfileobj(f_src, f_dst)
f_dst.flush()
os.fsync(f_dst.fileno())
# Delete source after successful copy
try:
@ -9996,12 +10050,27 @@ def _post_process_matched_download(context_key, context, file_path):
import time
from pathlib import Path
# --- GUI PARITY FIX: Add a delay to prevent file lock race conditions ---
# The GUI app waits 1 second to ensure the file handle is released by
# the download client before attempting to move or modify it.
print(f"⏳ Waiting 1 second for file handle release for: {os.path.basename(file_path)}")
time.sleep(1)
# --- END OF FIX ---
# --- FILE STABILITY CHECK ---
# Wait for the file to stop growing before processing. slskd may still be
# flushing write buffers when it reports "Completed". We poll the file size
# a few times; once it stabilises we know the write is finished.
_basename = os.path.basename(file_path)
_prev_size = -1
for _stability_check in range(5):
try:
_cur_size = os.path.getsize(file_path)
except OSError:
_cur_size = -1
if _cur_size == _prev_size and _cur_size > 0:
# Size unchanged — file is stable
break
_prev_size = _cur_size
if _stability_check == 0:
print(f"⏳ Waiting for file to stabilise: {_basename} ({_cur_size} bytes)")
time.sleep(1.5)
else:
print(f"⚠️ File may still be writing after stability checks: {_basename} ({_prev_size} bytes)")
# --- END FILE STABILITY CHECK ---
# --- ACOUSTID VERIFICATION ---
# Optional verification that downloaded audio matches expected track.
@ -15905,9 +15974,16 @@ def _build_batch_status_data(batch_id, batch, live_transfers_lookup):
task_status['status'] = 'downloading' # Default to downloading for error detection
task['status'] = 'downloading'
elif 'Completed' in state_str or 'Succeeded' in state_str:
# Verify bytes actually transferred before trusting state string
expected_size = live_info.get('size', 0)
transferred = live_info.get('bytesTransferred', 0)
if expected_size > 0 and transferred < expected_size:
# State says complete but bytes don't match — keep current status
task_status['status'] = task['status']
print(f"⚠️ Task {task_id} state says complete but bytes incomplete ({transferred}/{expected_size})")
# NEW VERIFICATION WORKFLOW: Use intermediate post_processing status
# Only set this status once to prevent multiple worker submissions
if task['status'] != 'post_processing':
elif task['status'] != 'post_processing':
task_status['status'] = 'post_processing'
task['status'] = 'post_processing'
print(f"🔄 Task {task_id} API reports 'Succeeded' - starting post-processing verification")

Loading…
Cancel
Save