"Fix intermittent deadlock in download monitor that freezes entire download pipeline

pull/153/head
Broque Thomas 3 months ago
parent 1d33a37eb2
commit cc5b13dded

@ -542,28 +542,32 @@ class WebUIDownloadMonitor:
def _check_all_downloads(self):
"""Check all active downloads for timeouts and failures"""
current_time = time.time()
# Get live transfer data from slskd
live_transfers_lookup = self._get_live_transfers()
# Track tasks with exhausted retries to handle after releasing lock
exhausted_tasks = [] # List of (batch_id, task_id) tuples
# Track completed downloads to handle after releasing lock (prevents deadlock)
completed_tasks = [] # List of (batch_id, task_id) tuples
# Track deferred operations (network calls, nested locks) to run after releasing tasks_lock
deferred_ops = []
with tasks_lock:
# Check all monitored batches for timeouts and errors
for batch_id in list(self.monitored_batches):
if batch_id not in download_batches:
self.monitored_batches.discard(batch_id)
continue
for task_id in download_batches[batch_id].get('queue', []):
task = download_tasks.get(task_id)
if not task or task['status'] not in ['downloading', 'queued']:
continue
# Check for timeouts and errors - retries handled directly in _should_retry_task
# If _should_retry_task returns True, it means retries were exhausted
retry_exhausted = self._should_retry_task(task_id, task, live_transfers_lookup, current_time)
retry_exhausted = self._should_retry_task(task_id, task, live_transfers_lookup, current_time, deferred_ops)
# Collect exhausted tasks to handle outside lock (prevents deadlock)
if retry_exhausted:
exhausted_tasks.append((batch_id, task_id))
@ -571,18 +575,52 @@ class WebUIDownloadMonitor:
# ENHANCED: Check for successful completions (especially YouTube)
task_filename = task.get('filename') or task.get('track_info', {}).get('filename')
task_username = task.get('username') or task.get('track_info', {}).get('username')
if task_filename and task_username:
lookup_key = f"{task_username}::{extract_filename(task_filename)}"
live_info = live_transfers_lookup.get(lookup_key)
if live_info:
state = live_info.get('state', '')
# Trigger post-processing if download is completed but still marked as downloading locally
# 'Completed' is used by YouTubeClient, 'Succeeded' by Soulseek
if state in ['Completed', 'Succeeded'] and task['status'] == 'downloading':
print(f"✅ Monitor detected completed download for {task_id} ({state}) - triggering post-processing")
_on_download_completed(batch_id, task_id, success=True)
print(f"✅ Monitor detected completed download for {task_id} ({state}) - deferring completion to outside lock")
# CRITICAL: Collect for handling outside the lock to prevent deadlock.
# _on_download_completed acquires tasks_lock which is non-reentrant.
completed_tasks.append((batch_id, task_id))
# ---- All work below runs WITHOUT tasks_lock held ----
# Execute deferred operations from _should_retry_task (network calls, nested locks)
for op in deferred_ops:
try:
if op[0] == 'cancel_download':
_, download_id, username = op
print(f"🚫 [Deferred] Cancelling download: {download_id} from {username}")
run_async(soulseek_client.cancel_download(download_id, username, remove=True))
print(f"✅ [Deferred] Successfully cancelled download {download_id}")
elif op[0] == 'cleanup_orphan':
_, context_key = op
with matched_context_lock:
matched_downloads_context.pop(context_key, None)
print(f"🧹 [Deferred] Cleaned up orphaned download context: {context_key}")
elif op[0] == 'restart_worker':
_, task_id, batch_id = op
print(f"🚀 [Deferred] Restarting worker for task {task_id}")
missing_download_executor.submit(_download_track_worker, task_id, batch_id)
print(f"✅ [Deferred] Successfully restarted worker for task {task_id}")
except Exception as e:
print(f"⚠️ [Deferred] Error executing deferred operation {op[0]}: {e}")
# Handle completed downloads outside the lock to prevent deadlock
# (_on_download_completed acquires tasks_lock internally)
for batch_id, task_id in completed_tasks:
try:
print(f"✅ [Monitor] Triggering post-processing for completed task {task_id}")
_on_download_completed(batch_id, task_id, success=True)
except Exception as e:
print(f"❌ [Monitor] Error handling completed task {task_id}: {e}")
# Handle exhausted retry tasks outside the lock to prevent deadlock
for batch_id, task_id in exhausted_tasks:
try:
@ -590,7 +628,7 @@ class WebUIDownloadMonitor:
_on_download_completed(batch_id, task_id, success=False)
except Exception as e:
print(f"❌ [Monitor] Error handling exhausted task {task_id}: {e}")
# ENHANCED: Add worker count validation to detect ghost workers
self._validate_worker_counts()
@ -649,52 +687,55 @@ class WebUIDownloadMonitor:
print(f"⚠️ Monitor: Could not fetch live transfers: {e}")
return {}
def _should_retry_task(self, task_id, task, live_transfers_lookup, current_time):
"""Determine if a task should be retried due to timeout (matches GUI logic)"""
def _should_retry_task(self, task_id, task, live_transfers_lookup, current_time, deferred_ops):
"""
Determine if a task should be retried due to timeout (matches GUI logic).
IMPORTANT: This runs while tasks_lock is held. All network calls (slskd API)
and nested lock acquisitions (matched_context_lock) are collected into deferred_ops
to be executed AFTER releasing tasks_lock. This prevents deadlocks and long lock holds.
Returns True if retries are exhausted and _on_download_completed should be called outside the lock.
"""
ti = task.get('track_info') if isinstance(task.get('track_info'), dict) else {}
task_filename = task.get('filename') or ti.get('filename')
task_username = task.get('username') or ti.get('username')
if not task_filename or not task_username:
return False
lookup_key = f"{task_username}::{extract_filename(task_filename)}"
live_info = live_transfers_lookup.get(lookup_key)
if not live_info:
# Task not in live transfers but status is downloading/queued - likely stuck
if current_time - task.get('status_change_time', current_time) > 90:
return True
return False
state_str = live_info.get('state', '')
progress = live_info.get('percentComplete', 0)
# IMMEDIATE ERROR RETRY: Check for errored/rejected/timed-out downloads first (no timeout needed)
if 'Errored' in state_str or 'Failed' in state_str or 'Rejected' in state_str or 'TimedOut' in state_str:
retry_count = task.get('error_retry_count', 0)
last_retry = task.get('last_error_retry_time', 0)
# Don't retry too frequently (wait at least 5 seconds between error retries)
# Don't retry too frequently (wait at least 5 seconds between error retries)
if retry_count < 3 and (current_time - last_retry) > 5: # Max 3 error retry attempts
print(f"🚨 Task errored (state: {state_str}) - immediate retry {retry_count + 1}/3")
task['error_retry_count'] = retry_count + 1
task['last_error_retry_time'] = current_time
# CRITICAL: Cancel the errored download in slskd before retry
_ti = task.get('track_info') if isinstance(task.get('track_info'), dict) else {}
username = task.get('username') or _ti.get('username')
filename = task.get('filename') or _ti.get('filename')
download_id = task.get('download_id')
# Defer slskd cancel to outside the lock
if username and download_id:
try:
print(f"🚫 Cancelling errored download: {download_id} from {username}")
run_async(soulseek_client.cancel_download(download_id, username, remove=True))
print(f"✅ Successfully cancelled errored download {download_id}")
except Exception as cancel_error:
print(f"⚠️ Warning: Failed to cancel errored download {download_id}: {cancel_error}")
deferred_ops.append(('cancel_download', download_id, username))
# Mark current source as used to prevent retry loops
if username and filename:
used_sources = task.get('used_sources', set())
@ -703,13 +744,11 @@ class WebUIDownloadMonitor:
task['used_sources'] = used_sources
print(f"🚫 Marked errored source as used: {source_key}")
# Mark old download as orphaned so we can clean it up if it completes later
# Defer orphan cleanup to outside the lock (needs matched_context_lock)
if username and filename:
old_context_key = f"{username}::{extract_filename(filename)}"
_orphaned_download_keys.add(old_context_key)
with matched_context_lock:
matched_downloads_context.pop(old_context_key, None)
print(f"🧹 Marked orphaned download for cleanup: {old_context_key}")
deferred_ops.append(('cleanup_orphan', old_context_key))
# Clear download info since we cancelled it
task.pop('download_id', None)
@ -722,18 +761,11 @@ class WebUIDownloadMonitor:
task.pop('downloading_start_time', None)
task['status_change_time'] = current_time
print(f"🔄 Task {task.get('track_info', {}).get('name', 'Unknown')} reset for error retry")
# CRITICAL: Immediately restart worker for error retry - don't rely on normal queue processing
# Defer worker restart to outside the lock
batch_id = task.get('batch_id')
if task_id and batch_id:
try:
print(f"🚀 [Error Retry] Immediately restarting worker for task {task_id}")
missing_download_executor.submit(_download_track_worker, task_id, batch_id)
print(f"✅ [Error Retry] Successfully restarted worker for task {task_id}")
except Exception as restart_error:
print(f"❌ [Error Retry] Failed to restart worker for task {task_id}: {restart_error}")
task['status'] = 'failed'
task['error_message'] = f'Failed to restart worker: {restart_error}'
deferred_ops.append(('restart_worker', task_id, batch_id))
return False
elif retry_count < 3:
# Wait a bit before next error retry
@ -746,16 +778,14 @@ class WebUIDownloadMonitor:
print(f"❌ Task failed after 3 error retry attempts")
task['status'] = 'failed'
task['error_message'] = f'Soulseek transfer errored 3 times for "{track_label}"{sources_str} — all sources failed or became unavailable'
# CRITICAL: Notify batch manager so track is added to permanently_failed_tracks
batch_id = task.get('batch_id')
if batch_id:
print(f"📋 [Retry Exhausted] Notifying batch manager of permanent failure for task {task_id}")
# Release lock before calling completion to prevent deadlock
# The completion callback will re-acquire the lock
return True # Signal that we need to call completion outside the lock
return False
# Check for queued timeout (90 seconds like GUI)
elif 'Queued' in state_str or task['status'] == 'queued':
if 'queued_start_time' not in task:
@ -763,7 +793,7 @@ class WebUIDownloadMonitor:
return False
else:
queue_time = current_time - task['queued_start_time']
# Use context-aware timeouts like GUI:
# - 15 seconds for artist album downloads (streaming context)
# - 90 seconds for background playlist downloads
@ -781,20 +811,15 @@ class WebUIDownloadMonitor:
task['stuck_retry_count'] = retry_count + 1
task['last_retry_time'] = current_time
# CRITICAL: Cancel the stuck download in slskd before retry
_ti = task.get('track_info') if isinstance(task.get('track_info'), dict) else {}
username = task.get('username') or _ti.get('username')
filename = task.get('filename') or _ti.get('filename')
download_id = task.get('download_id')
# Defer slskd cancel to outside the lock
if username and download_id:
try:
print(f"🚫 Cancelling stuck queued download: {download_id} from {username}")
run_async(soulseek_client.cancel_download(download_id, username, remove=True))
print(f"✅ Successfully cancelled stuck download {download_id}")
except Exception as cancel_error:
print(f"⚠️ Warning: Failed to cancel stuck download {download_id}: {cancel_error}")
deferred_ops.append(('cancel_download', download_id, username))
# UNIFIED RETRY LOGIC: Handle timeout retry exactly like error retry
# Mark current source as used to prevent retry loops
if username and filename:
@ -804,13 +829,11 @@ class WebUIDownloadMonitor:
task['used_sources'] = used_sources
print(f"🚫 Marked timeout source as used: {source_key}")
# Mark old download as orphaned so we can clean it up if it completes later
# Defer orphan cleanup to outside the lock (needs matched_context_lock)
if username and filename:
old_context_key = f"{username}::{extract_filename(filename)}"
_orphaned_download_keys.add(old_context_key)
with matched_context_lock:
matched_downloads_context.pop(old_context_key, None)
print(f"🧹 Marked orphaned download for cleanup: {old_context_key}")
deferred_ops.append(('cleanup_orphan', old_context_key))
# Clear download info since we cancelled it
task.pop('download_id', None)
@ -823,18 +846,11 @@ class WebUIDownloadMonitor:
task.pop('downloading_start_time', None)
task['status_change_time'] = current_time
print(f"🔄 Task {task.get('track_info', {}).get('name', 'Unknown')} reset for timeout retry")
# CRITICAL: Immediately restart worker for timeout retry - don't rely on normal queue processing
# Defer worker restart to outside the lock
batch_id = task.get('batch_id')
if task_id and batch_id:
try:
print(f"🚀 [Timeout Retry] Immediately restarting worker for task {task_id}")
missing_download_executor.submit(_download_track_worker, task_id, batch_id)
print(f"✅ [Timeout Retry] Successfully restarted worker for task {task_id}")
except Exception as restart_error:
print(f"❌ [Timeout Retry] Failed to restart worker for task {task_id}: {restart_error}")
task['status'] = 'failed'
task['error_message'] = f'Failed to restart worker: {restart_error}'
deferred_ops.append(('restart_worker', task_id, batch_id))
return False
elif retry_count < 3:
# Wait longer before next retry
@ -850,7 +866,7 @@ class WebUIDownloadMonitor:
# Clear timers to prevent further retry loops
task.pop('queued_start_time', None)
task.pop('downloading_start_time', None)
# CRITICAL: Notify batch manager so track is added to permanently_failed_tracks
batch_id = task.get('batch_id')
if batch_id:
@ -882,20 +898,15 @@ class WebUIDownloadMonitor:
task['stuck_retry_count'] = retry_count + 1
task['last_retry_time'] = current_time
# CRITICAL: Cancel the stuck download in slskd before retry
_ti = task.get('track_info') if isinstance(task.get('track_info'), dict) else {}
username = task.get('username') or _ti.get('username')
filename = task.get('filename') or _ti.get('filename')
download_id = task.get('download_id')
# Defer slskd cancel to outside the lock
if username and download_id:
try:
print(f"🚫 Cancelling stuck 0% download: {download_id} from {username}")
run_async(soulseek_client.cancel_download(download_id, username, remove=True))
print(f"✅ Successfully cancelled stuck 0% download {download_id}")
except Exception as cancel_error:
print(f"⚠️ Warning: Failed to cancel stuck 0% download {download_id}: {cancel_error}")
deferred_ops.append(('cancel_download', download_id, username))
# UNIFIED RETRY LOGIC: Handle 0% timeout retry exactly like error retry
# Mark current source as used to prevent retry loops
if username and filename:
@ -905,13 +916,11 @@ class WebUIDownloadMonitor:
task['used_sources'] = used_sources
print(f"🚫 Marked 0% progress source as used: {source_key}")
# Mark old download as orphaned so we can clean it up if it completes later
# Defer orphan cleanup to outside the lock (needs matched_context_lock)
if username and filename:
old_context_key = f"{username}::{extract_filename(filename)}"
_orphaned_download_keys.add(old_context_key)
with matched_context_lock:
matched_downloads_context.pop(old_context_key, None)
print(f"🧹 Marked orphaned download for cleanup: {old_context_key}")
deferred_ops.append(('cleanup_orphan', old_context_key))
# Clear download info since we cancelled it
task.pop('download_id', None)
@ -924,18 +933,11 @@ class WebUIDownloadMonitor:
task.pop('downloading_start_time', None)
task['status_change_time'] = current_time
print(f"🔄 Task {task.get('track_info', {}).get('name', 'Unknown')} reset for 0% retry")
# CRITICAL: Immediately restart worker for 0% retry - don't rely on normal queue processing
# Defer worker restart to outside the lock
batch_id = task.get('batch_id')
if task_id and batch_id:
try:
print(f"🚀 [0% Retry] Immediately restarting worker for task {task_id}")
missing_download_executor.submit(_download_track_worker, task_id, batch_id)
print(f"✅ [0% Retry] Successfully restarted worker for task {task_id}")
except Exception as restart_error:
print(f"❌ [0% Retry] Failed to restart worker for task {task_id}: {restart_error}")
task['status'] = 'failed'
task['error_message'] = f'Failed to restart worker: {restart_error}'
deferred_ops.append(('restart_worker', task_id, batch_id))
return False
elif retry_count < 3:
# Wait longer before next retry
@ -972,21 +974,23 @@ class WebUIDownloadMonitor:
This prevents the modal from showing wrong worker counts permanently.
"""
try:
batches_needing_workers = []
with tasks_lock:
for batch_id in list(self.monitored_batches):
if batch_id not in download_batches:
continue
batch = download_batches[batch_id]
reported_active = batch['active_count']
max_concurrent = batch['max_concurrent']
queue = batch.get('queue', [])
queue_index = batch.get('queue_index', 0)
# Count actually active tasks based on status
actually_active = 0
orphaned_tasks = []
for task_id in queue:
if task_id in download_tasks:
task_status = download_tasks[task_id]['status']
@ -995,30 +999,32 @@ class WebUIDownloadMonitor:
elif task_status in ['failed', 'complete', 'cancelled', 'not_found'] and task_id in queue[queue_index:]:
# These are orphaned tasks - they're done but still in active queue
orphaned_tasks.append(task_id)
# Check for discrepancies
if reported_active != actually_active or orphaned_tasks:
print(f"🔍 [Worker Validation] Batch {batch_id}: reported={reported_active}, actual={actually_active}, orphaned={len(orphaned_tasks)}")
if orphaned_tasks:
print(f"🧹 [Worker Validation] Found {len(orphaned_tasks)} orphaned tasks to cleanup")
# Fix the active count if it's wrong
if reported_active != actually_active:
old_count = batch['active_count']
batch['active_count'] = actually_active
print(f"✅ [Worker Validation] Fixed active count: {old_count}{actually_active}")
# If we freed up slots and have more work, try to start new workers
# Defer starting workers to outside the lock
if actually_active < max_concurrent and queue_index < len(queue):
print(f"🔄 [Worker Validation] Starting replacement workers")
# Release lock temporarily to avoid deadlock
tasks_lock.release()
try:
_start_next_batch_of_downloads(batch_id)
finally:
tasks_lock.acquire()
batches_needing_workers.append(batch_id)
# Start replacement workers outside the lock
for batch_id in batches_needing_workers:
try:
print(f"🔄 [Worker Validation] Starting replacement workers for {batch_id}")
_start_next_batch_of_downloads(batch_id)
except Exception as e:
print(f"❌ [Worker Validation] Error starting workers for {batch_id}: {e}")
except Exception as validation_error:
print(f"❌ Error in worker count validation: {validation_error}")
@ -1034,6 +1040,10 @@ def validate_and_heal_batch_states():
import time
current_time = time.time()
# Collect work to do outside the lock
batches_needing_workers = [] # batch_ids that need _start_next_batch_of_downloads
batches_needing_completion_check = [] # batch_ids that need _check_batch_completion_v2
with tasks_lock:
healed_batches = []
batches_to_cleanup = []
@ -1057,11 +1067,11 @@ def validate_and_heal_batch_states():
print(f"🧹 [Auto-Cleanup] Removing stale completed batch {batch_id} (completed {time_since_completion:.0f}s ago)")
batches_to_cleanup.append(batch_id)
continue # Skip other healing logic for this batch
# Count actually active tasks
actually_active = 0
orphaned_tasks = []
for task_id in queue:
if task_id in download_tasks:
task_status = download_tasks[task_id]['status']
@ -1072,39 +1082,25 @@ def validate_and_heal_batch_states():
else:
# Task in queue but not in download_tasks dict
orphaned_tasks.append(task_id)
# Check for inconsistencies
if active_count != actually_active:
print(f"🔧 [Batch Healing] {batch_id}: fixing active count {active_count}{actually_active}")
batch_data['active_count'] = actually_active
healed_batches.append(batch_id)
# If we freed up slots, try to start more workers
# If we freed up slots, defer starting workers to outside the lock
if actually_active < batch_data.get('max_concurrent', 3):
queue_index = batch_data.get('queue_index', 0)
if queue_index < len(queue):
print(f"🔄 [Batch Healing] Starting replacement workers for {batch_id}")
# Release lock temporarily to avoid deadlock
tasks_lock.release()
try:
_start_next_batch_of_downloads(batch_id)
finally:
tasks_lock.acquire()
batches_needing_workers.append(batch_id)
# Clean up orphaned tasks that are blocking progress
if orphaned_tasks and phase == 'downloading':
print(f"🧹 [Batch Healing] Found {len(orphaned_tasks)} orphaned tasks in active batch {batch_id}")
batches_needing_completion_check.append(batch_id)
# Trigger completion check to handle orphaned tasks
# Release lock temporarily to avoid deadlock
tasks_lock.release()
try:
print(f"🔄 [Batch Healing] Triggering completion check for batch with orphaned tasks")
_check_batch_completion_v2(batch_id)
finally:
tasks_lock.acquire()
# Cleanup stale batches outside the iteration loop
# Cleanup stale batches inside the lock (safe - just dict mutations)
for batch_id in batches_to_cleanup:
task_ids_to_remove = download_batches[batch_id].get('queue', [])
del download_batches[batch_id]
@ -1118,6 +1114,24 @@ def validate_and_heal_batch_states():
if healed_batches:
print(f"✅ [Batch Healing] Healed {len(healed_batches)} batches: {healed_batches}")
# ---- All work below runs WITHOUT tasks_lock held ----
# Start replacement workers for healed batches
for batch_id in batches_needing_workers:
try:
print(f"🔄 [Batch Healing] Starting replacement workers for {batch_id}")
_start_next_batch_of_downloads(batch_id)
except Exception as e:
print(f"❌ [Batch Healing] Error starting workers for {batch_id}: {e}")
# Trigger completion checks for batches with orphaned tasks
for batch_id in batches_needing_completion_check:
try:
print(f"🔄 [Batch Healing] Triggering completion check for batch with orphaned tasks")
_check_batch_completion_v2(batch_id)
except Exception as e:
print(f"❌ [Batch Healing] Error checking completion for {batch_id}: {e}")
except Exception as healing_error:
print(f"❌ [Batch Healing] Error during validation: {healing_error}")

Loading…
Cancel
Save