diff --git a/web_server.py b/web_server.py index 1fb95437..17dfe9e1 100644 --- a/web_server.py +++ b/web_server.py @@ -614,7 +614,19 @@ class WebUIDownloadMonitor: # Must exclude error states first (matching _build_batch_status_data's prioritized checking) has_error = ('Errored' in state or 'Failed' in state or 'Rejected' in state or 'TimedOut' in state) has_completion = ('Completed' in state or 'Succeeded' in state) + # Verify bytes actually transferred before trusting state string. + # slskd can report "Completed" before the full file is flushed to disk, + # or on connection drops that leave a partial file. + if has_completion and not has_error: + expected_size = live_info.get('size', 0) + transferred = live_info.get('bytesTransferred', 0) + if expected_size > 0 and transferred < expected_size: + if not task.get('_incomplete_warned'): + print(f"⚠️ Monitor: {task_id} state={state} but bytes incomplete ({transferred}/{expected_size}) — waiting") + task['_incomplete_warned'] = True + continue if has_completion and not has_error and task['status'] == 'downloading': + task.pop('_incomplete_warned', None) # CRITICAL FIX: Transition to 'post_processing' HERE so downloads # don't depend on browser polling to trigger post-processing. # Previously, post-processing was only submitted by _build_batch_status_data @@ -1380,7 +1392,11 @@ def _prepare_stream_task(track_data): # Track queue state timing (matching GUI logic) is_queued = ('queued' in download_state or 'initializing' in download_state) is_downloading = ('inprogress' in download_state or 'transferring' in download_state) - is_completed = ('succeeded' in download_state or api_progress >= 100) + # Verify bytes match before trusting state/progress + _stream_expected = download_status.get('size', 0) + _stream_transferred = download_status.get('bytesTransferred', 0) + _bytes_ok = _stream_expected <= 0 or _stream_transferred >= _stream_expected + is_completed = ('succeeded' in download_state or api_progress >= 100) and _bytes_ok # Handle queue state timing if is_queued and queue_start_time is None: @@ -1416,11 +1432,24 @@ def _prepare_stream_task(track_data): # Check if download is complete if is_completed: print(f"✓ Download completed via API status: {original_state}") - - # Give file system time to sync - time.sleep(1) - + + # Wait for file to stabilise on disk before moving found_file = _find_downloaded_file(download_path, track_data) + if found_file: + _prev_sz = -1 + for _sc in range(4): + try: + _cur_sz = os.path.getsize(found_file) + except OSError: + _cur_sz = -1 + if _cur_sz == _prev_sz and _cur_sz > 0: + break + _prev_sz = _cur_sz + time.sleep(1.5) + + # Re-find in case it wasn't found on first try + if not found_file: + found_file = _find_downloaded_file(download_path, track_data) # Retry file search a few times (matching GUI logic) retry_attempts = 5 @@ -4615,6 +4644,10 @@ def get_download_status(): # Don't return early if no Soulseek transfers - YouTube downloads need to be checked too! all_transfers = [] completed_matched_downloads = [] + # Track files already claimed this poll cycle to prevent two contexts from + # grabbing the same physical file when downloads share a basename (e.g., + # "07 - Aurora.flac" from two different albums/artists). + _files_claimed_this_cycle = set() if not transfers_data: # No Soulseek transfers, but continue to check YouTube downloads below @@ -4633,6 +4666,11 @@ def get_download_status(): # Check for completion state if ('succeeded' in state or 'completed' in state) and 'errored' not in state and 'rejected' not in state: + # Verify bytes actually transferred before trusting state + _fi_size = file_info.get('size', 0) + _fi_transferred = file_info.get('bytesTransferred', 0) + if _fi_size > 0 and _fi_transferred < _fi_size: + continue # Not truly complete yet filename_from_api = file_info.get('filename') if not filename_from_api: continue @@ -4698,17 +4736,23 @@ def get_download_status(): found_path = found_result[0] if found_result and found_result[0] else None if found_path: - print(f"🎯 Found completed matched file on disk: {found_path}") - completed_matched_downloads.append((context_key, context, found_path)) - # Don't add to _processed_download_ids yet - wait until thread starts successfully - - # Clean up retry tracking if file was found after retries - with _download_retry_lock: - if context_key in _download_retry_attempts: - retry_count = _download_retry_attempts[context_key]['count'] - elapsed = time.time() - _download_retry_attempts[context_key]['first_attempt'] - print(f"✅ File found after {retry_count} retry attempt(s) ({elapsed:.1f}s): {os.path.basename(filename_from_api)}") - del _download_retry_attempts[context_key] + # Prevent two contexts from claiming the same physical file + _norm_path = os.path.normpath(found_path) + if _norm_path in _files_claimed_this_cycle: + print(f"⚠️ File already claimed by another context this cycle: {os.path.basename(found_path)} — deferring to next poll") + else: + _files_claimed_this_cycle.add(_norm_path) + print(f"🎯 Found completed matched file on disk: {found_path}") + completed_matched_downloads.append((context_key, context, found_path)) + # Don't add to _processed_download_ids yet - wait until thread starts successfully + + # Clean up retry tracking if file was found after retries + with _download_retry_lock: + if context_key in _download_retry_attempts: + retry_count = _download_retry_attempts[context_key]['count'] + elapsed = time.time() - _download_retry_attempts[context_key]['first_attempt'] + print(f"✅ File found after {retry_count} retry attempt(s) ({elapsed:.1f}s): {os.path.basename(filename_from_api)}") + del _download_retry_attempts[context_key] else: # File not found yet - implement retry logic instead of immediate give-up # This fixes race condition where slskd reports completion before file is written to disk @@ -4785,7 +4829,9 @@ def get_download_status(): all_transfers.append(youtube_transfer) # Check if YouTube download is completed and needs post-processing - if download.state and ('succeeded' in download.state.lower() or 'completed' in download.state.lower()): + # Verify bytes match before trusting state string + _yt_bytes_ok = download.size <= 0 or download.transferred >= download.size + if _yt_bytes_ok and download.state and ('succeeded' in download.state.lower() or 'completed' in download.state.lower()): context_key = _make_context_key(download.username, download.filename) with matched_context_lock: @@ -4797,6 +4843,12 @@ def get_download_status(): found_path = found_result[0] if found_result and found_result[0] else None if found_path: + # Prevent two contexts from claiming the same physical file + _yt_norm = os.path.normpath(found_path) + if _yt_norm in _files_claimed_this_cycle: + print(f"⚠️ [YouTube] File already claimed this cycle: {os.path.basename(found_path)} — deferring") + continue + _files_claimed_this_cycle.add(_yt_norm) print(f"🎯 [YouTube] Found completed matched file on disk: {found_path}") # Start post-processing thread def process_youtube_download(): @@ -9953,6 +10005,8 @@ def _safe_move_file(src, dst): with open(src, 'rb') as f_src: with open(dst, 'wb') as f_dst: shutil.copyfileobj(f_src, f_dst) + f_dst.flush() + os.fsync(f_dst.fileno()) # Delete source after successful copy try: @@ -9996,12 +10050,27 @@ def _post_process_matched_download(context_key, context, file_path): import time from pathlib import Path - # --- GUI PARITY FIX: Add a delay to prevent file lock race conditions --- - # The GUI app waits 1 second to ensure the file handle is released by - # the download client before attempting to move or modify it. - print(f"⏳ Waiting 1 second for file handle release for: {os.path.basename(file_path)}") - time.sleep(1) - # --- END OF FIX --- + # --- FILE STABILITY CHECK --- + # Wait for the file to stop growing before processing. slskd may still be + # flushing write buffers when it reports "Completed". We poll the file size + # a few times; once it stabilises we know the write is finished. + _basename = os.path.basename(file_path) + _prev_size = -1 + for _stability_check in range(5): + try: + _cur_size = os.path.getsize(file_path) + except OSError: + _cur_size = -1 + if _cur_size == _prev_size and _cur_size > 0: + # Size unchanged — file is stable + break + _prev_size = _cur_size + if _stability_check == 0: + print(f"⏳ Waiting for file to stabilise: {_basename} ({_cur_size} bytes)") + time.sleep(1.5) + else: + print(f"⚠️ File may still be writing after stability checks: {_basename} ({_prev_size} bytes)") + # --- END FILE STABILITY CHECK --- # --- ACOUSTID VERIFICATION --- # Optional verification that downloaded audio matches expected track. @@ -15905,9 +15974,16 @@ def _build_batch_status_data(batch_id, batch, live_transfers_lookup): task_status['status'] = 'downloading' # Default to downloading for error detection task['status'] = 'downloading' elif 'Completed' in state_str or 'Succeeded' in state_str: + # Verify bytes actually transferred before trusting state string + expected_size = live_info.get('size', 0) + transferred = live_info.get('bytesTransferred', 0) + if expected_size > 0 and transferred < expected_size: + # State says complete but bytes don't match — keep current status + task_status['status'] = task['status'] + print(f"⚠️ Task {task_id} state says complete but bytes incomplete ({transferred}/{expected_size})") # NEW VERIFICATION WORKFLOW: Use intermediate post_processing status # Only set this status once to prevent multiple worker submissions - if task['status'] != 'post_processing': + elif task['status'] != 'post_processing': task_status['status'] = 'post_processing' task['status'] = 'post_processing' print(f"🔄 Task {task_id} API reports 'Succeeded' - starting post-processing verification")