From d4112ae1319ef8faa59e43954b09edc7099af6f4 Mon Sep 17 00:00:00 2001 From: Broque Thomas Date: Sun, 7 Sep 2025 23:53:44 -0700 Subject: [PATCH] Update web_server.py --- web_server.py | 321 +++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 277 insertions(+), 44 deletions(-) diff --git a/web_server.py b/web_server.py index 55554d61..18aeda71 100644 --- a/web_server.py +++ b/web_server.py @@ -244,7 +244,7 @@ class WebUIDownloadMonitor: continue # Check for timeouts and errors - retries handled directly in _should_retry_task - self._should_retry_task(task, live_transfers_lookup, current_time) + self._should_retry_task(task_id, task, live_transfers_lookup, current_time) # ENHANCED: Add worker count validation to detect ghost workers self._validate_worker_counts() @@ -282,7 +282,7 @@ class WebUIDownloadMonitor: print(f"⚠️ Monitor: Could not fetch live transfers: {e}") return {} - def _should_retry_task(self, task, live_transfers_lookup, current_time): + def _should_retry_task(self, task_id, task, live_transfers_lookup, current_time): """Determine if a task should be retried due to timeout (matches GUI logic)""" task_filename = task.get('filename') or task['track_info'].get('filename') task_username = task.get('username') or task['track_info'].get('username') @@ -312,7 +312,52 @@ class WebUIDownloadMonitor: print(f"🚨 Task errored (state: {state_str}) - immediate retry {retry_count + 1}/3") task['error_retry_count'] = retry_count + 1 task['last_error_retry_time'] = current_time - return True + + # CRITICAL: Cancel the errored download in slskd before retry + username = task.get('username') or task['track_info'].get('username') + filename = task.get('filename') or task['track_info'].get('filename') + download_id = task.get('download_id') + + if username and download_id: + try: + print(f"🚫 Cancelling errored download: {download_id} from {username}") + asyncio.run(soulseek_client.cancel_download(download_id, username, remove=True)) + print(f"✅ Successfully cancelled errored download {download_id}") + except Exception as cancel_error: + print(f"⚠️ Warning: Failed to cancel errored download {download_id}: {cancel_error}") + + # Mark current source as used to prevent retry loops + if username and filename: + used_sources = task.get('used_sources', set()) + source_key = f"{username}_{os.path.basename(filename)}" + used_sources.add(source_key) + task['used_sources'] = used_sources + print(f"🚫 Marked errored source as used: {source_key}") + + # Clear download info since we cancelled it + task.pop('download_id', None) + task.pop('username', None) + task.pop('filename', None) + + # Reset task state for immediate retry + task['status'] = 'searching' + task.pop('queued_start_time', None) + task.pop('downloading_start_time', None) + task['status_change_time'] = current_time + print(f"🔄 Task {task.get('track_info', {}).get('name', 'Unknown')} reset for error retry") + + # CRITICAL: Immediately restart worker for error retry - don't rely on normal queue processing + batch_id = task.get('batch_id') + if task_id and batch_id: + try: + print(f"🚀 [Error Retry] Immediately restarting worker for task {task_id}") + missing_download_executor.submit(_download_track_worker, task_id, batch_id) + print(f"✅ [Error Retry] Successfully restarted worker for task {task_id}") + except Exception as restart_error: + print(f"❌ [Error Retry] Failed to restart worker for task {task_id}: {restart_error}") + task['status'] = 'failed' + task['error_message'] = f'Failed to restart worker: {restart_error}' + return False elif retry_count < 3: # Wait a bit before next error retry return False @@ -348,10 +393,21 @@ class WebUIDownloadMonitor: task['stuck_retry_count'] = retry_count + 1 task['last_retry_time'] = current_time - # UNIFIED RETRY LOGIC: Handle timeout retry exactly like error retry - # Mark current source as used to prevent retry loops + # CRITICAL: Cancel the stuck download in slskd before retry username = task.get('username') or task['track_info'].get('username') filename = task.get('filename') or task['track_info'].get('filename') + download_id = task.get('download_id') + + if username and download_id: + try: + print(f"🚫 Cancelling stuck queued download: {download_id} from {username}") + asyncio.run(soulseek_client.cancel_download(download_id, username, remove=True)) + print(f"✅ Successfully cancelled stuck download {download_id}") + except Exception as cancel_error: + print(f"⚠️ Warning: Failed to cancel stuck download {download_id}: {cancel_error}") + + # UNIFIED RETRY LOGIC: Handle timeout retry exactly like error retry + # Mark current source as used to prevent retry loops if username and filename: used_sources = task.get('used_sources', set()) source_key = f"{username}_{os.path.basename(filename)}" @@ -359,13 +415,30 @@ class WebUIDownloadMonitor: task['used_sources'] = used_sources print(f"🚫 Marked timeout source as used: {source_key}") + # Clear download info since we cancelled it + task.pop('download_id', None) + task.pop('username', None) + task.pop('filename', None) + # Reset task state for immediate retry (like error retry) task['status'] = 'searching' task.pop('queued_start_time', None) task.pop('downloading_start_time', None) task['status_change_time'] = current_time print(f"🔄 Task {task.get('track_info', {}).get('name', 'Unknown')} reset for timeout retry") - return False # Don't trigger monitor retry - task will be picked up by normal flow + + # CRITICAL: Immediately restart worker for timeout retry - don't rely on normal queue processing + batch_id = task.get('batch_id') + if task_id and batch_id: + try: + print(f"🚀 [Timeout Retry] Immediately restarting worker for task {task_id}") + missing_download_executor.submit(_download_track_worker, task_id, batch_id) + print(f"✅ [Timeout Retry] Successfully restarted worker for task {task_id}") + except Exception as restart_error: + print(f"❌ [Timeout Retry] Failed to restart worker for task {task_id}: {restart_error}") + task['status'] = 'failed' + task['error_message'] = f'Failed to restart worker: {restart_error}' + return False elif retry_count < 3: # Wait longer before next retry return False @@ -403,10 +476,21 @@ class WebUIDownloadMonitor: task['stuck_retry_count'] = retry_count + 1 task['last_retry_time'] = current_time - # UNIFIED RETRY LOGIC: Handle 0% timeout retry exactly like error retry - # Mark current source as used to prevent retry loops + # CRITICAL: Cancel the stuck download in slskd before retry username = task.get('username') or task['track_info'].get('username') filename = task.get('filename') or task['track_info'].get('filename') + download_id = task.get('download_id') + + if username and download_id: + try: + print(f"🚫 Cancelling stuck 0% download: {download_id} from {username}") + asyncio.run(soulseek_client.cancel_download(download_id, username, remove=True)) + print(f"✅ Successfully cancelled stuck 0% download {download_id}") + except Exception as cancel_error: + print(f"⚠️ Warning: Failed to cancel stuck 0% download {download_id}: {cancel_error}") + + # UNIFIED RETRY LOGIC: Handle 0% timeout retry exactly like error retry + # Mark current source as used to prevent retry loops if username and filename: used_sources = task.get('used_sources', set()) source_key = f"{username}_{os.path.basename(filename)}" @@ -414,13 +498,30 @@ class WebUIDownloadMonitor: task['used_sources'] = used_sources print(f"🚫 Marked 0% progress source as used: {source_key}") + # Clear download info since we cancelled it + task.pop('download_id', None) + task.pop('username', None) + task.pop('filename', None) + # Reset task state for immediate retry (like error retry) task['status'] = 'searching' task.pop('queued_start_time', None) task.pop('downloading_start_time', None) task['status_change_time'] = current_time print(f"🔄 Task {task.get('track_info', {}).get('name', 'Unknown')} reset for 0% retry") - return False # Don't trigger monitor retry - task will be picked up by normal flow + + # CRITICAL: Immediately restart worker for 0% retry - don't rely on normal queue processing + batch_id = task.get('batch_id') + if task_id and batch_id: + try: + print(f"🚀 [0% Retry] Immediately restarting worker for task {task_id}") + missing_download_executor.submit(_download_track_worker, task_id, batch_id) + print(f"✅ [0% Retry] Successfully restarted worker for task {task_id}") + except Exception as restart_error: + print(f"❌ [0% Retry] Failed to restart worker for task {task_id}: {restart_error}") + task['status'] = 'failed' + task['error_message'] = f'Failed to restart worker: {restart_error}' + return False elif retry_count < 3: # Wait longer before next retry return False @@ -1500,11 +1601,14 @@ def start_download(): -def _find_completed_file_robust(download_dir, api_filename): +def _find_completed_file_robust(download_dir, api_filename, transfer_dir=None): """ Robustly finds a completed file on disk, accounting for name variations and unexpected subdirectories. This version uses the superior normalization logic from the GUI's matching_engine.py to ensure consistency. + + First searches in download_dir, then optionally searches in transfer_dir if provided. + Returns tuple (file_path, location) where location is 'downloads' or 'transfer'. """ import re import os @@ -1523,36 +1627,63 @@ def _find_completed_file_robust(download_dir, api_filename): # Consolidate multiple spaces return ' '.join(text.split()).strip() - target_basename = os.path.basename(api_filename) - normalized_target = normalize_for_finding(target_basename) - print(f" searching for normalized filename '{normalized_target}' in '{download_dir}'...") + def search_in_directory(search_dir, location_name): + """Search for the file in a specific directory.""" + print(f" searching for normalized filename '{normalized_target}' in '{search_dir}'...") + + best_match_path = None + highest_similarity = 0.0 - best_match_path = None - highest_similarity = 0.0 + # Walk through the entire directory + for root, _, files in os.walk(search_dir): + for file in files: + # Direct match is the best case + if os.path.basename(file) == target_basename: + file_path = os.path.join(root, file) + print(f"Found exact match: {file_path}") + return file_path, 1.0 + + # Fuzzy matching for variations + normalized_file = normalize_for_finding(file) + similarity = SequenceMatcher(None, normalized_target, normalized_file).ratio() - # Walk through the entire download directory - for root, _, files in os.walk(download_dir): - for file in files: - # Direct match is the best case - if os.path.basename(file) == target_basename: - print(f"Found exact match: {os.path.join(root, file)}") - return os.path.join(root, file) - - # Fuzzy matching for variations - normalized_file = normalize_for_finding(file) - similarity = SequenceMatcher(None, normalized_target, normalized_file).ratio() + if similarity > highest_similarity: + highest_similarity = similarity + best_match_path = os.path.join(root, file) + + return best_match_path, highest_similarity + + target_basename = os.path.basename(api_filename) + normalized_target = normalize_for_finding(target_basename) - if similarity > highest_similarity: - highest_similarity = similarity - best_match_path = os.path.join(root, file) + # First search in downloads directory + best_downloads_path, downloads_similarity = search_in_directory(download_dir, 'downloads') # Use a high confidence threshold for fuzzy matches to avoid incorrect files - if highest_similarity > 0.85: - print(f"Found best fuzzy match with similarity {highest_similarity:.2f}: {best_match_path}") - return best_match_path + if downloads_similarity > 0.85: + location = 'downloads' + if downloads_similarity == 1.0: + print(f"✅ Found exact match in downloads: {best_downloads_path}") + else: + print(f"✅ Found best fuzzy match in downloads with similarity {downloads_similarity:.2f}: {best_downloads_path}") + return (best_downloads_path, location) + + # If not found in downloads and transfer_dir is provided, search there + transfer_similarity = 0.0 # Initialize transfer_similarity + if transfer_dir and os.path.exists(transfer_dir): + print(f"🔍 File not found in downloads, checking transfer folder...") + best_transfer_path, transfer_similarity = search_in_directory(transfer_dir, 'transfer') + + if transfer_similarity > 0.85: + location = 'transfer' + if transfer_similarity == 1.0: + print(f"✅ Found exact match in transfer: {best_transfer_path}") + else: + print(f"✅ Found best fuzzy match in transfer with similarity {transfer_similarity:.2f}: {best_transfer_path}") + return (best_transfer_path, location) - print(f"Could not find a confident match for '{target_basename}'. Highest similarity was {highest_similarity:.2f}.") - return None + print(f"❌ Could not find a confident match for '{target_basename}' in any location. Best similarity was {max(downloads_similarity, transfer_similarity):.2f}.") + return (None, None) @@ -1599,8 +1730,9 @@ def get_download_status(): if context and context_key not in _processed_download_ids: download_dir = config_manager.get('soulseek.download_path', './downloads') - # Use the new robust file finder - found_path = _find_completed_file_robust(download_dir, filename_from_api) + # Use the new robust file finder (only search downloads for post-processing candidates) + found_result = _find_completed_file_robust(download_dir, filename_from_api) + found_path = found_result[0] if found_result and found_result[0] else None if found_path: print(f"🎯 Found completed matched file on disk: {found_path}") @@ -1626,11 +1758,9 @@ def get_download_status(): _processed_download_ids.add(context_key) print(f"✅ Marked as processed: {context_key}") - # Remove context so it's not processed again - with matched_context_lock: - if context_key in matched_downloads_context: - del matched_downloads_context[context_key] - print(f"🗑️ Removed context: {context_key}") + # DON'T remove context immediately - verification worker needs it + # Context will be cleaned up by verification worker after both processors complete + print(f"💾 Keeping context for verification worker: {context_key}") except Exception as e: print(f"❌ Error starting post-processing thread for {context_key}: {e}") @@ -4102,6 +4232,13 @@ def _post_process_matched_download_with_verification(context_key, context, file_ if task_id in download_tasks: download_tasks[task_id]['status'] = 'completed' print(f"✅ [Verification] Task {task_id} marked as completed after verification") + + # Clean up context now that both stream processor and verification worker are done + with matched_context_lock: + if context_key in matched_downloads_context: + del matched_downloads_context[context_key] + print(f"🗑️ [Verification] Cleaned up context after successful verification: {context_key}") + # FIXED: Call completion callback now since we prevented original post-processing from calling it print(f"✅ [Verification] Task {task_id} verification complete - calling batch completion callback") _on_download_completed(batch_id, task_id, success=True) @@ -4111,6 +4248,13 @@ def _post_process_matched_download_with_verification(context_key, context, file_ if task_id in download_tasks: download_tasks[task_id]['status'] = 'failed' download_tasks[task_id]['error_message'] = "File move to transfer folder failed." + + # Clean up context even on failure to prevent memory leaks + with matched_context_lock: + if context_key in matched_downloads_context: + del matched_downloads_context[context_key] + print(f"🗑️ [Verification] Cleaned up context after verification failure: {context_key}") + _on_download_completed(batch_id, task_id, success=False) except Exception as e: @@ -4121,6 +4265,13 @@ def _post_process_matched_download_with_verification(context_key, context, file_ if task_id in download_tasks: download_tasks[task_id]['status'] = 'failed' download_tasks[task_id]['error_message'] = f"Post-processing verification failed: {str(e)}" + + # Clean up context even on exception to prevent memory leaks + with matched_context_lock: + if context_key in matched_downloads_context: + del matched_downloads_context[context_key] + print(f"🗑️ [Verification] Cleaned up context after exception: {context_key}") + _on_download_completed(batch_id, task_id, success=False) @@ -5456,15 +5607,74 @@ def _run_post_processing_worker(task_id, batch_id): return download_dir = config_manager.get('soulseek.download_path', './downloads') + transfer_dir = config_manager.get('soulseek.transfer_path', './transfer') + + # Try to get context for generating the correct final filename + context_key = f"{task_username}::{os.path.basename(task_filename)}" + expected_final_filename = None + + print(f"🔍 [Post-Processing] Looking up context with key: {context_key}") + + with matched_context_lock: + context = matched_downloads_context.get(context_key) + # Debug: Show all available context keys + available_keys = list(matched_downloads_context.keys()) + print(f"🔍 [Post-Processing] Available context keys: {available_keys[:10]}...") # Show first 10 keys + + if context: + print(f"✅ [Post-Processing] Found context for key: {context_key}") + try: + original_search = context.get("original_search_result", {}) + print(f"🔍 [Post-Processing] original_search keys: {list(original_search.keys())}") + + spotify_clean_title = original_search.get('spotify_clean_title') + track_number = original_search.get('track_number') + + print(f"🔍 [Post-Processing] spotify_clean_title: '{spotify_clean_title}', track_number: {track_number}") + + if spotify_clean_title and track_number: + # Generate expected final filename that stream processor would create + # Pattern: f"{track_number:02d} - {clean_title}.flac" + sanitized_title = spotify_clean_title.replace('/', '_').replace('\\', '_').replace(':', '_').replace('*', '_').replace('?', '_').replace('"', '_').replace('<', '_').replace('>', '_').replace('|', '_') + expected_final_filename = f"{track_number:02d} - {sanitized_title}.flac" + print(f"🎯 [Post-Processing] Generated expected final filename: {expected_final_filename}") + else: + print(f"❌ [Post-Processing] Missing required data - spotify_clean_title: {bool(spotify_clean_title)}, track_number: {bool(track_number)}") + except Exception as e: + print(f"⚠️ [Post-Processing] Error generating expected filename: {e}") + import traceback + traceback.print_exc() + else: + print(f"❌ [Post-Processing] No context found for key: {context_key}") + # Try to find similar keys to debug the issue + similar_keys = [k for k in matched_downloads_context.keys() if os.path.basename(task_filename) in k] + if similar_keys: + print(f"🔍 [Post-Processing] Similar keys found: {similar_keys}") + else: + print(f"🔍 [Post-Processing] No similar keys found containing '{os.path.basename(task_filename)}'") + # Show a sample of what keys actually exist + sample_keys = list(matched_downloads_context.keys())[:5] + print(f"🔍 [Post-Processing] Sample of existing keys: {sample_keys}") # RESILIENT FILE-FINDING LOOP: Try up to 3 times with delays found_file = None + file_location = None for retry_count in range(3): print(f"🔍 [Post-Processing] Attempt {retry_count + 1}/3 to find file: {os.path.basename(task_filename)}") - found_file = _find_completed_file_robust(download_dir, task_filename) + + # First try with original filename + found_file, file_location = _find_completed_file_robust(download_dir, task_filename, transfer_dir) + + # If not found and we have an expected final filename, try that in transfer folder + if not found_file and expected_final_filename: + print(f"🔍 [Post-Processing] Trying with expected final filename: {expected_final_filename}") + found_result = _find_completed_file_robust(transfer_dir, expected_final_filename) + if found_result and found_result[0]: + found_file, file_location = found_result[0], 'transfer' + print(f"✅ [Post-Processing] Found file with expected final filename: {found_file}") if found_file: - print(f"✅ [Post-Processing] Found file after {retry_count + 1} attempts: {found_file}") + print(f"✅ [Post-Processing] Found file after {retry_count + 1} attempts in {file_location}: {found_file}") break else: if retry_count < 2: # Don't sleep on final attempt @@ -5480,7 +5690,23 @@ def _run_post_processing_worker(task_id, batch_id): _on_download_completed(batch_id, task_id, success=False) return - # File found - now attempt post-processing + # Handle file found in transfer folder - already completed by stream processor + if file_location == 'transfer': + print(f"🎯 [Post-Processing] File found in transfer folder - already completed by stream processor: {found_file}") + with tasks_lock: + if task_id in download_tasks: + download_tasks[task_id]['status'] = 'completed' + + # Clean up context now that both stream processor and verification worker are done + with matched_context_lock: + if context_key in matched_downloads_context: + del matched_downloads_context[context_key] + print(f"🗑️ [Verification] Cleaned up context after successful verification: {context_key}") + + _on_download_completed(batch_id, task_id, success=True) + return + + # File found in downloads folder - attempt post-processing try: # Create context for post-processing (similar to existing matched download logic) context_key = f"{task_username}::{os.path.basename(task_filename)}" @@ -5499,6 +5725,13 @@ def _run_post_processing_worker(task_id, batch_id): with tasks_lock: if task_id in download_tasks: download_tasks[task_id]['status'] = 'completed' + + # Clean up context if it exists (might be leftover from stream processor) + with matched_context_lock: + if context_key in matched_downloads_context: + del matched_downloads_context[context_key] + print(f"🗑️ [Verification] Cleaned up leftover context: {context_key}") + # Call completion callback since there's no other post-processing to handle it _on_download_completed(batch_id, task_id, success=True)