Update web_server.py

pull/15/head
Broque Thomas 8 months ago
parent 82253017d2
commit d4112ae131

@ -244,7 +244,7 @@ class WebUIDownloadMonitor:
continue
# Check for timeouts and errors - retries handled directly in _should_retry_task
self._should_retry_task(task, live_transfers_lookup, current_time)
self._should_retry_task(task_id, task, live_transfers_lookup, current_time)
# ENHANCED: Add worker count validation to detect ghost workers
self._validate_worker_counts()
@ -282,7 +282,7 @@ class WebUIDownloadMonitor:
print(f"⚠️ Monitor: Could not fetch live transfers: {e}")
return {}
def _should_retry_task(self, task, live_transfers_lookup, current_time):
def _should_retry_task(self, task_id, task, live_transfers_lookup, current_time):
"""Determine if a task should be retried due to timeout (matches GUI logic)"""
task_filename = task.get('filename') or task['track_info'].get('filename')
task_username = task.get('username') or task['track_info'].get('username')
@ -312,7 +312,52 @@ class WebUIDownloadMonitor:
print(f"🚨 Task errored (state: {state_str}) - immediate retry {retry_count + 1}/3")
task['error_retry_count'] = retry_count + 1
task['last_error_retry_time'] = current_time
return True
# CRITICAL: Cancel the errored download in slskd before retry
username = task.get('username') or task['track_info'].get('username')
filename = task.get('filename') or task['track_info'].get('filename')
download_id = task.get('download_id')
if username and download_id:
try:
print(f"🚫 Cancelling errored download: {download_id} from {username}")
asyncio.run(soulseek_client.cancel_download(download_id, username, remove=True))
print(f"✅ Successfully cancelled errored download {download_id}")
except Exception as cancel_error:
print(f"⚠️ Warning: Failed to cancel errored download {download_id}: {cancel_error}")
# Mark current source as used to prevent retry loops
if username and filename:
used_sources = task.get('used_sources', set())
source_key = f"{username}_{os.path.basename(filename)}"
used_sources.add(source_key)
task['used_sources'] = used_sources
print(f"🚫 Marked errored source as used: {source_key}")
# Clear download info since we cancelled it
task.pop('download_id', None)
task.pop('username', None)
task.pop('filename', None)
# Reset task state for immediate retry
task['status'] = 'searching'
task.pop('queued_start_time', None)
task.pop('downloading_start_time', None)
task['status_change_time'] = current_time
print(f"🔄 Task {task.get('track_info', {}).get('name', 'Unknown')} reset for error retry")
# CRITICAL: Immediately restart worker for error retry - don't rely on normal queue processing
batch_id = task.get('batch_id')
if task_id and batch_id:
try:
print(f"🚀 [Error Retry] Immediately restarting worker for task {task_id}")
missing_download_executor.submit(_download_track_worker, task_id, batch_id)
print(f"✅ [Error Retry] Successfully restarted worker for task {task_id}")
except Exception as restart_error:
print(f"❌ [Error Retry] Failed to restart worker for task {task_id}: {restart_error}")
task['status'] = 'failed'
task['error_message'] = f'Failed to restart worker: {restart_error}'
return False
elif retry_count < 3:
# Wait a bit before next error retry
return False
@ -348,10 +393,21 @@ class WebUIDownloadMonitor:
task['stuck_retry_count'] = retry_count + 1
task['last_retry_time'] = current_time
# UNIFIED RETRY LOGIC: Handle timeout retry exactly like error retry
# Mark current source as used to prevent retry loops
# CRITICAL: Cancel the stuck download in slskd before retry
username = task.get('username') or task['track_info'].get('username')
filename = task.get('filename') or task['track_info'].get('filename')
download_id = task.get('download_id')
if username and download_id:
try:
print(f"🚫 Cancelling stuck queued download: {download_id} from {username}")
asyncio.run(soulseek_client.cancel_download(download_id, username, remove=True))
print(f"✅ Successfully cancelled stuck download {download_id}")
except Exception as cancel_error:
print(f"⚠️ Warning: Failed to cancel stuck download {download_id}: {cancel_error}")
# UNIFIED RETRY LOGIC: Handle timeout retry exactly like error retry
# Mark current source as used to prevent retry loops
if username and filename:
used_sources = task.get('used_sources', set())
source_key = f"{username}_{os.path.basename(filename)}"
@ -359,13 +415,30 @@ class WebUIDownloadMonitor:
task['used_sources'] = used_sources
print(f"🚫 Marked timeout source as used: {source_key}")
# Clear download info since we cancelled it
task.pop('download_id', None)
task.pop('username', None)
task.pop('filename', None)
# Reset task state for immediate retry (like error retry)
task['status'] = 'searching'
task.pop('queued_start_time', None)
task.pop('downloading_start_time', None)
task['status_change_time'] = current_time
print(f"🔄 Task {task.get('track_info', {}).get('name', 'Unknown')} reset for timeout retry")
return False # Don't trigger monitor retry - task will be picked up by normal flow
# CRITICAL: Immediately restart worker for timeout retry - don't rely on normal queue processing
batch_id = task.get('batch_id')
if task_id and batch_id:
try:
print(f"🚀 [Timeout Retry] Immediately restarting worker for task {task_id}")
missing_download_executor.submit(_download_track_worker, task_id, batch_id)
print(f"✅ [Timeout Retry] Successfully restarted worker for task {task_id}")
except Exception as restart_error:
print(f"❌ [Timeout Retry] Failed to restart worker for task {task_id}: {restart_error}")
task['status'] = 'failed'
task['error_message'] = f'Failed to restart worker: {restart_error}'
return False
elif retry_count < 3:
# Wait longer before next retry
return False
@ -403,10 +476,21 @@ class WebUIDownloadMonitor:
task['stuck_retry_count'] = retry_count + 1
task['last_retry_time'] = current_time
# UNIFIED RETRY LOGIC: Handle 0% timeout retry exactly like error retry
# Mark current source as used to prevent retry loops
# CRITICAL: Cancel the stuck download in slskd before retry
username = task.get('username') or task['track_info'].get('username')
filename = task.get('filename') or task['track_info'].get('filename')
download_id = task.get('download_id')
if username and download_id:
try:
print(f"🚫 Cancelling stuck 0% download: {download_id} from {username}")
asyncio.run(soulseek_client.cancel_download(download_id, username, remove=True))
print(f"✅ Successfully cancelled stuck 0% download {download_id}")
except Exception as cancel_error:
print(f"⚠️ Warning: Failed to cancel stuck 0% download {download_id}: {cancel_error}")
# UNIFIED RETRY LOGIC: Handle 0% timeout retry exactly like error retry
# Mark current source as used to prevent retry loops
if username and filename:
used_sources = task.get('used_sources', set())
source_key = f"{username}_{os.path.basename(filename)}"
@ -414,13 +498,30 @@ class WebUIDownloadMonitor:
task['used_sources'] = used_sources
print(f"🚫 Marked 0% progress source as used: {source_key}")
# Clear download info since we cancelled it
task.pop('download_id', None)
task.pop('username', None)
task.pop('filename', None)
# Reset task state for immediate retry (like error retry)
task['status'] = 'searching'
task.pop('queued_start_time', None)
task.pop('downloading_start_time', None)
task['status_change_time'] = current_time
print(f"🔄 Task {task.get('track_info', {}).get('name', 'Unknown')} reset for 0% retry")
return False # Don't trigger monitor retry - task will be picked up by normal flow
# CRITICAL: Immediately restart worker for 0% retry - don't rely on normal queue processing
batch_id = task.get('batch_id')
if task_id and batch_id:
try:
print(f"🚀 [0% Retry] Immediately restarting worker for task {task_id}")
missing_download_executor.submit(_download_track_worker, task_id, batch_id)
print(f"✅ [0% Retry] Successfully restarted worker for task {task_id}")
except Exception as restart_error:
print(f"❌ [0% Retry] Failed to restart worker for task {task_id}: {restart_error}")
task['status'] = 'failed'
task['error_message'] = f'Failed to restart worker: {restart_error}'
return False
elif retry_count < 3:
# Wait longer before next retry
return False
@ -1500,11 +1601,14 @@ def start_download():
def _find_completed_file_robust(download_dir, api_filename):
def _find_completed_file_robust(download_dir, api_filename, transfer_dir=None):
"""
Robustly finds a completed file on disk, accounting for name variations and
unexpected subdirectories. This version uses the superior normalization logic
from the GUI's matching_engine.py to ensure consistency.
First searches in download_dir, then optionally searches in transfer_dir if provided.
Returns tuple (file_path, location) where location is 'downloads' or 'transfer'.
"""
import re
import os
@ -1523,36 +1627,63 @@ def _find_completed_file_robust(download_dir, api_filename):
# Consolidate multiple spaces
return ' '.join(text.split()).strip()
target_basename = os.path.basename(api_filename)
normalized_target = normalize_for_finding(target_basename)
print(f" searching for normalized filename '{normalized_target}' in '{download_dir}'...")
def search_in_directory(search_dir, location_name):
"""Search for the file in a specific directory."""
print(f" searching for normalized filename '{normalized_target}' in '{search_dir}'...")
best_match_path = None
highest_similarity = 0.0
best_match_path = None
highest_similarity = 0.0
# Walk through the entire directory
for root, _, files in os.walk(search_dir):
for file in files:
# Direct match is the best case
if os.path.basename(file) == target_basename:
file_path = os.path.join(root, file)
print(f"Found exact match: {file_path}")
return file_path, 1.0
# Fuzzy matching for variations
normalized_file = normalize_for_finding(file)
similarity = SequenceMatcher(None, normalized_target, normalized_file).ratio()
# Walk through the entire download directory
for root, _, files in os.walk(download_dir):
for file in files:
# Direct match is the best case
if os.path.basename(file) == target_basename:
print(f"Found exact match: {os.path.join(root, file)}")
return os.path.join(root, file)
# Fuzzy matching for variations
normalized_file = normalize_for_finding(file)
similarity = SequenceMatcher(None, normalized_target, normalized_file).ratio()
if similarity > highest_similarity:
highest_similarity = similarity
best_match_path = os.path.join(root, file)
return best_match_path, highest_similarity
target_basename = os.path.basename(api_filename)
normalized_target = normalize_for_finding(target_basename)
if similarity > highest_similarity:
highest_similarity = similarity
best_match_path = os.path.join(root, file)
# First search in downloads directory
best_downloads_path, downloads_similarity = search_in_directory(download_dir, 'downloads')
# Use a high confidence threshold for fuzzy matches to avoid incorrect files
if highest_similarity > 0.85:
print(f"Found best fuzzy match with similarity {highest_similarity:.2f}: {best_match_path}")
return best_match_path
if downloads_similarity > 0.85:
location = 'downloads'
if downloads_similarity == 1.0:
print(f"✅ Found exact match in downloads: {best_downloads_path}")
else:
print(f"✅ Found best fuzzy match in downloads with similarity {downloads_similarity:.2f}: {best_downloads_path}")
return (best_downloads_path, location)
# If not found in downloads and transfer_dir is provided, search there
transfer_similarity = 0.0 # Initialize transfer_similarity
if transfer_dir and os.path.exists(transfer_dir):
print(f"🔍 File not found in downloads, checking transfer folder...")
best_transfer_path, transfer_similarity = search_in_directory(transfer_dir, 'transfer')
if transfer_similarity > 0.85:
location = 'transfer'
if transfer_similarity == 1.0:
print(f"✅ Found exact match in transfer: {best_transfer_path}")
else:
print(f"✅ Found best fuzzy match in transfer with similarity {transfer_similarity:.2f}: {best_transfer_path}")
return (best_transfer_path, location)
print(f"Could not find a confident match for '{target_basename}'. Highest similarity was {highest_similarity:.2f}.")
return None
print(f"Could not find a confident match for '{target_basename}' in any location. Best similarity was {max(downloads_similarity, transfer_similarity):.2f}.")
return (None, None)
@ -1599,8 +1730,9 @@ def get_download_status():
if context and context_key not in _processed_download_ids:
download_dir = config_manager.get('soulseek.download_path', './downloads')
# Use the new robust file finder
found_path = _find_completed_file_robust(download_dir, filename_from_api)
# Use the new robust file finder (only search downloads for post-processing candidates)
found_result = _find_completed_file_robust(download_dir, filename_from_api)
found_path = found_result[0] if found_result and found_result[0] else None
if found_path:
print(f"🎯 Found completed matched file on disk: {found_path}")
@ -1626,11 +1758,9 @@ def get_download_status():
_processed_download_ids.add(context_key)
print(f"✅ Marked as processed: {context_key}")
# Remove context so it's not processed again
with matched_context_lock:
if context_key in matched_downloads_context:
del matched_downloads_context[context_key]
print(f"🗑️ Removed context: {context_key}")
# DON'T remove context immediately - verification worker needs it
# Context will be cleaned up by verification worker after both processors complete
print(f"💾 Keeping context for verification worker: {context_key}")
except Exception as e:
print(f"❌ Error starting post-processing thread for {context_key}: {e}")
@ -4102,6 +4232,13 @@ def _post_process_matched_download_with_verification(context_key, context, file_
if task_id in download_tasks:
download_tasks[task_id]['status'] = 'completed'
print(f"✅ [Verification] Task {task_id} marked as completed after verification")
# Clean up context now that both stream processor and verification worker are done
with matched_context_lock:
if context_key in matched_downloads_context:
del matched_downloads_context[context_key]
print(f"🗑️ [Verification] Cleaned up context after successful verification: {context_key}")
# FIXED: Call completion callback now since we prevented original post-processing from calling it
print(f"✅ [Verification] Task {task_id} verification complete - calling batch completion callback")
_on_download_completed(batch_id, task_id, success=True)
@ -4111,6 +4248,13 @@ def _post_process_matched_download_with_verification(context_key, context, file_
if task_id in download_tasks:
download_tasks[task_id]['status'] = 'failed'
download_tasks[task_id]['error_message'] = "File move to transfer folder failed."
# Clean up context even on failure to prevent memory leaks
with matched_context_lock:
if context_key in matched_downloads_context:
del matched_downloads_context[context_key]
print(f"🗑️ [Verification] Cleaned up context after verification failure: {context_key}")
_on_download_completed(batch_id, task_id, success=False)
except Exception as e:
@ -4121,6 +4265,13 @@ def _post_process_matched_download_with_verification(context_key, context, file_
if task_id in download_tasks:
download_tasks[task_id]['status'] = 'failed'
download_tasks[task_id]['error_message'] = f"Post-processing verification failed: {str(e)}"
# Clean up context even on exception to prevent memory leaks
with matched_context_lock:
if context_key in matched_downloads_context:
del matched_downloads_context[context_key]
print(f"🗑️ [Verification] Cleaned up context after exception: {context_key}")
_on_download_completed(batch_id, task_id, success=False)
@ -5456,15 +5607,74 @@ def _run_post_processing_worker(task_id, batch_id):
return
download_dir = config_manager.get('soulseek.download_path', './downloads')
transfer_dir = config_manager.get('soulseek.transfer_path', './transfer')
# Try to get context for generating the correct final filename
context_key = f"{task_username}::{os.path.basename(task_filename)}"
expected_final_filename = None
print(f"🔍 [Post-Processing] Looking up context with key: {context_key}")
with matched_context_lock:
context = matched_downloads_context.get(context_key)
# Debug: Show all available context keys
available_keys = list(matched_downloads_context.keys())
print(f"🔍 [Post-Processing] Available context keys: {available_keys[:10]}...") # Show first 10 keys
if context:
print(f"✅ [Post-Processing] Found context for key: {context_key}")
try:
original_search = context.get("original_search_result", {})
print(f"🔍 [Post-Processing] original_search keys: {list(original_search.keys())}")
spotify_clean_title = original_search.get('spotify_clean_title')
track_number = original_search.get('track_number')
print(f"🔍 [Post-Processing] spotify_clean_title: '{spotify_clean_title}', track_number: {track_number}")
if spotify_clean_title and track_number:
# Generate expected final filename that stream processor would create
# Pattern: f"{track_number:02d} - {clean_title}.flac"
sanitized_title = spotify_clean_title.replace('/', '_').replace('\\', '_').replace(':', '_').replace('*', '_').replace('?', '_').replace('"', '_').replace('<', '_').replace('>', '_').replace('|', '_')
expected_final_filename = f"{track_number:02d} - {sanitized_title}.flac"
print(f"🎯 [Post-Processing] Generated expected final filename: {expected_final_filename}")
else:
print(f"❌ [Post-Processing] Missing required data - spotify_clean_title: {bool(spotify_clean_title)}, track_number: {bool(track_number)}")
except Exception as e:
print(f"⚠️ [Post-Processing] Error generating expected filename: {e}")
import traceback
traceback.print_exc()
else:
print(f"❌ [Post-Processing] No context found for key: {context_key}")
# Try to find similar keys to debug the issue
similar_keys = [k for k in matched_downloads_context.keys() if os.path.basename(task_filename) in k]
if similar_keys:
print(f"🔍 [Post-Processing] Similar keys found: {similar_keys}")
else:
print(f"🔍 [Post-Processing] No similar keys found containing '{os.path.basename(task_filename)}'")
# Show a sample of what keys actually exist
sample_keys = list(matched_downloads_context.keys())[:5]
print(f"🔍 [Post-Processing] Sample of existing keys: {sample_keys}")
# RESILIENT FILE-FINDING LOOP: Try up to 3 times with delays
found_file = None
file_location = None
for retry_count in range(3):
print(f"🔍 [Post-Processing] Attempt {retry_count + 1}/3 to find file: {os.path.basename(task_filename)}")
found_file = _find_completed_file_robust(download_dir, task_filename)
# First try with original filename
found_file, file_location = _find_completed_file_robust(download_dir, task_filename, transfer_dir)
# If not found and we have an expected final filename, try that in transfer folder
if not found_file and expected_final_filename:
print(f"🔍 [Post-Processing] Trying with expected final filename: {expected_final_filename}")
found_result = _find_completed_file_robust(transfer_dir, expected_final_filename)
if found_result and found_result[0]:
found_file, file_location = found_result[0], 'transfer'
print(f"✅ [Post-Processing] Found file with expected final filename: {found_file}")
if found_file:
print(f"✅ [Post-Processing] Found file after {retry_count + 1} attempts: {found_file}")
print(f"✅ [Post-Processing] Found file after {retry_count + 1} attempts in {file_location}: {found_file}")
break
else:
if retry_count < 2: # Don't sleep on final attempt
@ -5480,7 +5690,23 @@ def _run_post_processing_worker(task_id, batch_id):
_on_download_completed(batch_id, task_id, success=False)
return
# File found - now attempt post-processing
# Handle file found in transfer folder - already completed by stream processor
if file_location == 'transfer':
print(f"🎯 [Post-Processing] File found in transfer folder - already completed by stream processor: {found_file}")
with tasks_lock:
if task_id in download_tasks:
download_tasks[task_id]['status'] = 'completed'
# Clean up context now that both stream processor and verification worker are done
with matched_context_lock:
if context_key in matched_downloads_context:
del matched_downloads_context[context_key]
print(f"🗑️ [Verification] Cleaned up context after successful verification: {context_key}")
_on_download_completed(batch_id, task_id, success=True)
return
# File found in downloads folder - attempt post-processing
try:
# Create context for post-processing (similar to existing matched download logic)
context_key = f"{task_username}::{os.path.basename(task_filename)}"
@ -5499,6 +5725,13 @@ def _run_post_processing_worker(task_id, batch_id):
with tasks_lock:
if task_id in download_tasks:
download_tasks[task_id]['status'] = 'completed'
# Clean up context if it exists (might be leftover from stream processor)
with matched_context_lock:
if context_key in matched_downloads_context:
del matched_downloads_context[context_key]
print(f"🗑️ [Verification] Cleaned up leftover context: {context_key}")
# Call completion callback since there's no other post-processing to handle it
_on_download_completed(batch_id, task_id, success=True)

Loading…
Cancel
Save