diff --git a/web_server.py b/web_server.py index f9128494..6f82c913 100644 --- a/web_server.py +++ b/web_server.py @@ -542,28 +542,32 @@ class WebUIDownloadMonitor: def _check_all_downloads(self): """Check all active downloads for timeouts and failures""" current_time = time.time() - + # Get live transfer data from slskd live_transfers_lookup = self._get_live_transfers() - + # Track tasks with exhausted retries to handle after releasing lock exhausted_tasks = [] # List of (batch_id, task_id) tuples - + # Track completed downloads to handle after releasing lock (prevents deadlock) + completed_tasks = [] # List of (batch_id, task_id) tuples + # Track deferred operations (network calls, nested locks) to run after releasing tasks_lock + deferred_ops = [] + with tasks_lock: # Check all monitored batches for timeouts and errors for batch_id in list(self.monitored_batches): if batch_id not in download_batches: self.monitored_batches.discard(batch_id) continue - + for task_id in download_batches[batch_id].get('queue', []): task = download_tasks.get(task_id) if not task or task['status'] not in ['downloading', 'queued']: continue - + # Check for timeouts and errors - retries handled directly in _should_retry_task # If _should_retry_task returns True, it means retries were exhausted - retry_exhausted = self._should_retry_task(task_id, task, live_transfers_lookup, current_time) + retry_exhausted = self._should_retry_task(task_id, task, live_transfers_lookup, current_time, deferred_ops) # Collect exhausted tasks to handle outside lock (prevents deadlock) if retry_exhausted: exhausted_tasks.append((batch_id, task_id)) @@ -571,18 +575,52 @@ class WebUIDownloadMonitor: # ENHANCED: Check for successful completions (especially YouTube) task_filename = task.get('filename') or task.get('track_info', {}).get('filename') task_username = task.get('username') or task.get('track_info', {}).get('username') - + if task_filename and task_username: lookup_key = f"{task_username}::{extract_filename(task_filename)}" live_info = live_transfers_lookup.get(lookup_key) - + if live_info: state = live_info.get('state', '') # Trigger post-processing if download is completed but still marked as downloading locally # 'Completed' is used by YouTubeClient, 'Succeeded' by Soulseek if state in ['Completed', 'Succeeded'] and task['status'] == 'downloading': - print(f"โœ… Monitor detected completed download for {task_id} ({state}) - triggering post-processing") - _on_download_completed(batch_id, task_id, success=True) + print(f"โœ… Monitor detected completed download for {task_id} ({state}) - deferring completion to outside lock") + # CRITICAL: Collect for handling outside the lock to prevent deadlock. + # _on_download_completed acquires tasks_lock which is non-reentrant. + completed_tasks.append((batch_id, task_id)) + + # ---- All work below runs WITHOUT tasks_lock held ---- + + # Execute deferred operations from _should_retry_task (network calls, nested locks) + for op in deferred_ops: + try: + if op[0] == 'cancel_download': + _, download_id, username = op + print(f"๐Ÿšซ [Deferred] Cancelling download: {download_id} from {username}") + run_async(soulseek_client.cancel_download(download_id, username, remove=True)) + print(f"โœ… [Deferred] Successfully cancelled download {download_id}") + elif op[0] == 'cleanup_orphan': + _, context_key = op + with matched_context_lock: + matched_downloads_context.pop(context_key, None) + print(f"๐Ÿงน [Deferred] Cleaned up orphaned download context: {context_key}") + elif op[0] == 'restart_worker': + _, task_id, batch_id = op + print(f"๐Ÿš€ [Deferred] Restarting worker for task {task_id}") + missing_download_executor.submit(_download_track_worker, task_id, batch_id) + print(f"โœ… [Deferred] Successfully restarted worker for task {task_id}") + except Exception as e: + print(f"โš ๏ธ [Deferred] Error executing deferred operation {op[0]}: {e}") + + # Handle completed downloads outside the lock to prevent deadlock + # (_on_download_completed acquires tasks_lock internally) + for batch_id, task_id in completed_tasks: + try: + print(f"โœ… [Monitor] Triggering post-processing for completed task {task_id}") + _on_download_completed(batch_id, task_id, success=True) + except Exception as e: + print(f"โŒ [Monitor] Error handling completed task {task_id}: {e}") # Handle exhausted retry tasks outside the lock to prevent deadlock for batch_id, task_id in exhausted_tasks: try: @@ -590,7 +628,7 @@ class WebUIDownloadMonitor: _on_download_completed(batch_id, task_id, success=False) except Exception as e: print(f"โŒ [Monitor] Error handling exhausted task {task_id}: {e}") - + # ENHANCED: Add worker count validation to detect ghost workers self._validate_worker_counts() @@ -649,52 +687,55 @@ class WebUIDownloadMonitor: print(f"โš ๏ธ Monitor: Could not fetch live transfers: {e}") return {} - def _should_retry_task(self, task_id, task, live_transfers_lookup, current_time): - """Determine if a task should be retried due to timeout (matches GUI logic)""" + def _should_retry_task(self, task_id, task, live_transfers_lookup, current_time, deferred_ops): + """ + Determine if a task should be retried due to timeout (matches GUI logic). + + IMPORTANT: This runs while tasks_lock is held. All network calls (slskd API) + and nested lock acquisitions (matched_context_lock) are collected into deferred_ops + to be executed AFTER releasing tasks_lock. This prevents deadlocks and long lock holds. + + Returns True if retries are exhausted and _on_download_completed should be called outside the lock. + """ ti = task.get('track_info') if isinstance(task.get('track_info'), dict) else {} task_filename = task.get('filename') or ti.get('filename') task_username = task.get('username') or ti.get('username') - + if not task_filename or not task_username: return False - + lookup_key = f"{task_username}::{extract_filename(task_filename)}" live_info = live_transfers_lookup.get(lookup_key) - + if not live_info: # Task not in live transfers but status is downloading/queued - likely stuck if current_time - task.get('status_change_time', current_time) > 90: return True return False - + state_str = live_info.get('state', '') progress = live_info.get('percentComplete', 0) - + # IMMEDIATE ERROR RETRY: Check for errored/rejected/timed-out downloads first (no timeout needed) if 'Errored' in state_str or 'Failed' in state_str or 'Rejected' in state_str or 'TimedOut' in state_str: retry_count = task.get('error_retry_count', 0) last_retry = task.get('last_error_retry_time', 0) - - # Don't retry too frequently (wait at least 5 seconds between error retries) + + # Don't retry too frequently (wait at least 5 seconds between error retries) if retry_count < 3 and (current_time - last_retry) > 5: # Max 3 error retry attempts print(f"๐Ÿšจ Task errored (state: {state_str}) - immediate retry {retry_count + 1}/3") task['error_retry_count'] = retry_count + 1 task['last_error_retry_time'] = current_time - - # CRITICAL: Cancel the errored download in slskd before retry + _ti = task.get('track_info') if isinstance(task.get('track_info'), dict) else {} username = task.get('username') or _ti.get('username') filename = task.get('filename') or _ti.get('filename') download_id = task.get('download_id') + # Defer slskd cancel to outside the lock if username and download_id: - try: - print(f"๐Ÿšซ Cancelling errored download: {download_id} from {username}") - run_async(soulseek_client.cancel_download(download_id, username, remove=True)) - print(f"โœ… Successfully cancelled errored download {download_id}") - except Exception as cancel_error: - print(f"โš ๏ธ Warning: Failed to cancel errored download {download_id}: {cancel_error}") - + deferred_ops.append(('cancel_download', download_id, username)) + # Mark current source as used to prevent retry loops if username and filename: used_sources = task.get('used_sources', set()) @@ -703,13 +744,11 @@ class WebUIDownloadMonitor: task['used_sources'] = used_sources print(f"๐Ÿšซ Marked errored source as used: {source_key}") - # Mark old download as orphaned so we can clean it up if it completes later + # Defer orphan cleanup to outside the lock (needs matched_context_lock) if username and filename: old_context_key = f"{username}::{extract_filename(filename)}" _orphaned_download_keys.add(old_context_key) - with matched_context_lock: - matched_downloads_context.pop(old_context_key, None) - print(f"๐Ÿงน Marked orphaned download for cleanup: {old_context_key}") + deferred_ops.append(('cleanup_orphan', old_context_key)) # Clear download info since we cancelled it task.pop('download_id', None) @@ -722,18 +761,11 @@ class WebUIDownloadMonitor: task.pop('downloading_start_time', None) task['status_change_time'] = current_time print(f"๐Ÿ”„ Task {task.get('track_info', {}).get('name', 'Unknown')} reset for error retry") - - # CRITICAL: Immediately restart worker for error retry - don't rely on normal queue processing + + # Defer worker restart to outside the lock batch_id = task.get('batch_id') if task_id and batch_id: - try: - print(f"๐Ÿš€ [Error Retry] Immediately restarting worker for task {task_id}") - missing_download_executor.submit(_download_track_worker, task_id, batch_id) - print(f"โœ… [Error Retry] Successfully restarted worker for task {task_id}") - except Exception as restart_error: - print(f"โŒ [Error Retry] Failed to restart worker for task {task_id}: {restart_error}") - task['status'] = 'failed' - task['error_message'] = f'Failed to restart worker: {restart_error}' + deferred_ops.append(('restart_worker', task_id, batch_id)) return False elif retry_count < 3: # Wait a bit before next error retry @@ -746,16 +778,14 @@ class WebUIDownloadMonitor: print(f"โŒ Task failed after 3 error retry attempts") task['status'] = 'failed' task['error_message'] = f'Soulseek transfer errored 3 times for "{track_label}"{sources_str} โ€” all sources failed or became unavailable' - + # CRITICAL: Notify batch manager so track is added to permanently_failed_tracks batch_id = task.get('batch_id') if batch_id: print(f"๐Ÿ“‹ [Retry Exhausted] Notifying batch manager of permanent failure for task {task_id}") - # Release lock before calling completion to prevent deadlock - # The completion callback will re-acquire the lock return True # Signal that we need to call completion outside the lock return False - + # Check for queued timeout (90 seconds like GUI) elif 'Queued' in state_str or task['status'] == 'queued': if 'queued_start_time' not in task: @@ -763,7 +793,7 @@ class WebUIDownloadMonitor: return False else: queue_time = current_time - task['queued_start_time'] - + # Use context-aware timeouts like GUI: # - 15 seconds for artist album downloads (streaming context) # - 90 seconds for background playlist downloads @@ -781,20 +811,15 @@ class WebUIDownloadMonitor: task['stuck_retry_count'] = retry_count + 1 task['last_retry_time'] = current_time - # CRITICAL: Cancel the stuck download in slskd before retry _ti = task.get('track_info') if isinstance(task.get('track_info'), dict) else {} username = task.get('username') or _ti.get('username') filename = task.get('filename') or _ti.get('filename') download_id = task.get('download_id') - + + # Defer slskd cancel to outside the lock if username and download_id: - try: - print(f"๐Ÿšซ Cancelling stuck queued download: {download_id} from {username}") - run_async(soulseek_client.cancel_download(download_id, username, remove=True)) - print(f"โœ… Successfully cancelled stuck download {download_id}") - except Exception as cancel_error: - print(f"โš ๏ธ Warning: Failed to cancel stuck download {download_id}: {cancel_error}") - + deferred_ops.append(('cancel_download', download_id, username)) + # UNIFIED RETRY LOGIC: Handle timeout retry exactly like error retry # Mark current source as used to prevent retry loops if username and filename: @@ -804,13 +829,11 @@ class WebUIDownloadMonitor: task['used_sources'] = used_sources print(f"๐Ÿšซ Marked timeout source as used: {source_key}") - # Mark old download as orphaned so we can clean it up if it completes later + # Defer orphan cleanup to outside the lock (needs matched_context_lock) if username and filename: old_context_key = f"{username}::{extract_filename(filename)}" _orphaned_download_keys.add(old_context_key) - with matched_context_lock: - matched_downloads_context.pop(old_context_key, None) - print(f"๐Ÿงน Marked orphaned download for cleanup: {old_context_key}") + deferred_ops.append(('cleanup_orphan', old_context_key)) # Clear download info since we cancelled it task.pop('download_id', None) @@ -823,18 +846,11 @@ class WebUIDownloadMonitor: task.pop('downloading_start_time', None) task['status_change_time'] = current_time print(f"๐Ÿ”„ Task {task.get('track_info', {}).get('name', 'Unknown')} reset for timeout retry") - - # CRITICAL: Immediately restart worker for timeout retry - don't rely on normal queue processing + + # Defer worker restart to outside the lock batch_id = task.get('batch_id') if task_id and batch_id: - try: - print(f"๐Ÿš€ [Timeout Retry] Immediately restarting worker for task {task_id}") - missing_download_executor.submit(_download_track_worker, task_id, batch_id) - print(f"โœ… [Timeout Retry] Successfully restarted worker for task {task_id}") - except Exception as restart_error: - print(f"โŒ [Timeout Retry] Failed to restart worker for task {task_id}: {restart_error}") - task['status'] = 'failed' - task['error_message'] = f'Failed to restart worker: {restart_error}' + deferred_ops.append(('restart_worker', task_id, batch_id)) return False elif retry_count < 3: # Wait longer before next retry @@ -850,7 +866,7 @@ class WebUIDownloadMonitor: # Clear timers to prevent further retry loops task.pop('queued_start_time', None) task.pop('downloading_start_time', None) - + # CRITICAL: Notify batch manager so track is added to permanently_failed_tracks batch_id = task.get('batch_id') if batch_id: @@ -882,20 +898,15 @@ class WebUIDownloadMonitor: task['stuck_retry_count'] = retry_count + 1 task['last_retry_time'] = current_time - # CRITICAL: Cancel the stuck download in slskd before retry _ti = task.get('track_info') if isinstance(task.get('track_info'), dict) else {} username = task.get('username') or _ti.get('username') filename = task.get('filename') or _ti.get('filename') download_id = task.get('download_id') - + + # Defer slskd cancel to outside the lock if username and download_id: - try: - print(f"๐Ÿšซ Cancelling stuck 0% download: {download_id} from {username}") - run_async(soulseek_client.cancel_download(download_id, username, remove=True)) - print(f"โœ… Successfully cancelled stuck 0% download {download_id}") - except Exception as cancel_error: - print(f"โš ๏ธ Warning: Failed to cancel stuck 0% download {download_id}: {cancel_error}") - + deferred_ops.append(('cancel_download', download_id, username)) + # UNIFIED RETRY LOGIC: Handle 0% timeout retry exactly like error retry # Mark current source as used to prevent retry loops if username and filename: @@ -905,13 +916,11 @@ class WebUIDownloadMonitor: task['used_sources'] = used_sources print(f"๐Ÿšซ Marked 0% progress source as used: {source_key}") - # Mark old download as orphaned so we can clean it up if it completes later + # Defer orphan cleanup to outside the lock (needs matched_context_lock) if username and filename: old_context_key = f"{username}::{extract_filename(filename)}" _orphaned_download_keys.add(old_context_key) - with matched_context_lock: - matched_downloads_context.pop(old_context_key, None) - print(f"๐Ÿงน Marked orphaned download for cleanup: {old_context_key}") + deferred_ops.append(('cleanup_orphan', old_context_key)) # Clear download info since we cancelled it task.pop('download_id', None) @@ -924,18 +933,11 @@ class WebUIDownloadMonitor: task.pop('downloading_start_time', None) task['status_change_time'] = current_time print(f"๐Ÿ”„ Task {task.get('track_info', {}).get('name', 'Unknown')} reset for 0% retry") - - # CRITICAL: Immediately restart worker for 0% retry - don't rely on normal queue processing + + # Defer worker restart to outside the lock batch_id = task.get('batch_id') if task_id and batch_id: - try: - print(f"๐Ÿš€ [0% Retry] Immediately restarting worker for task {task_id}") - missing_download_executor.submit(_download_track_worker, task_id, batch_id) - print(f"โœ… [0% Retry] Successfully restarted worker for task {task_id}") - except Exception as restart_error: - print(f"โŒ [0% Retry] Failed to restart worker for task {task_id}: {restart_error}") - task['status'] = 'failed' - task['error_message'] = f'Failed to restart worker: {restart_error}' + deferred_ops.append(('restart_worker', task_id, batch_id)) return False elif retry_count < 3: # Wait longer before next retry @@ -972,21 +974,23 @@ class WebUIDownloadMonitor: This prevents the modal from showing wrong worker counts permanently. """ try: + batches_needing_workers = [] + with tasks_lock: for batch_id in list(self.monitored_batches): if batch_id not in download_batches: continue - + batch = download_batches[batch_id] reported_active = batch['active_count'] max_concurrent = batch['max_concurrent'] queue = batch.get('queue', []) queue_index = batch.get('queue_index', 0) - + # Count actually active tasks based on status actually_active = 0 orphaned_tasks = [] - + for task_id in queue: if task_id in download_tasks: task_status = download_tasks[task_id]['status'] @@ -995,30 +999,32 @@ class WebUIDownloadMonitor: elif task_status in ['failed', 'complete', 'cancelled', 'not_found'] and task_id in queue[queue_index:]: # These are orphaned tasks - they're done but still in active queue orphaned_tasks.append(task_id) - + # Check for discrepancies if reported_active != actually_active or orphaned_tasks: print(f"๐Ÿ” [Worker Validation] Batch {batch_id}: reported={reported_active}, actual={actually_active}, orphaned={len(orphaned_tasks)}") - + if orphaned_tasks: print(f"๐Ÿงน [Worker Validation] Found {len(orphaned_tasks)} orphaned tasks to cleanup") - + # Fix the active count if it's wrong if reported_active != actually_active: old_count = batch['active_count'] batch['active_count'] = actually_active print(f"โœ… [Worker Validation] Fixed active count: {old_count} โ†’ {actually_active}") - - # If we freed up slots and have more work, try to start new workers + + # Defer starting workers to outside the lock if actually_active < max_concurrent and queue_index < len(queue): - print(f"๐Ÿ”„ [Worker Validation] Starting replacement workers") - # Release lock temporarily to avoid deadlock - tasks_lock.release() - try: - _start_next_batch_of_downloads(batch_id) - finally: - tasks_lock.acquire() - + batches_needing_workers.append(batch_id) + + # Start replacement workers outside the lock + for batch_id in batches_needing_workers: + try: + print(f"๐Ÿ”„ [Worker Validation] Starting replacement workers for {batch_id}") + _start_next_batch_of_downloads(batch_id) + except Exception as e: + print(f"โŒ [Worker Validation] Error starting workers for {batch_id}: {e}") + except Exception as validation_error: print(f"โŒ Error in worker count validation: {validation_error}") @@ -1034,6 +1040,10 @@ def validate_and_heal_batch_states(): import time current_time = time.time() + # Collect work to do outside the lock + batches_needing_workers = [] # batch_ids that need _start_next_batch_of_downloads + batches_needing_completion_check = [] # batch_ids that need _check_batch_completion_v2 + with tasks_lock: healed_batches = [] batches_to_cleanup = [] @@ -1057,11 +1067,11 @@ def validate_and_heal_batch_states(): print(f"๐Ÿงน [Auto-Cleanup] Removing stale completed batch {batch_id} (completed {time_since_completion:.0f}s ago)") batches_to_cleanup.append(batch_id) continue # Skip other healing logic for this batch - + # Count actually active tasks actually_active = 0 orphaned_tasks = [] - + for task_id in queue: if task_id in download_tasks: task_status = download_tasks[task_id]['status'] @@ -1072,39 +1082,25 @@ def validate_and_heal_batch_states(): else: # Task in queue but not in download_tasks dict orphaned_tasks.append(task_id) - + # Check for inconsistencies if active_count != actually_active: print(f"๐Ÿ”ง [Batch Healing] {batch_id}: fixing active count {active_count} โ†’ {actually_active}") batch_data['active_count'] = actually_active healed_batches.append(batch_id) - - # If we freed up slots, try to start more workers + + # If we freed up slots, defer starting workers to outside the lock if actually_active < batch_data.get('max_concurrent', 3): queue_index = batch_data.get('queue_index', 0) if queue_index < len(queue): - print(f"๐Ÿ”„ [Batch Healing] Starting replacement workers for {batch_id}") - # Release lock temporarily to avoid deadlock - tasks_lock.release() - try: - _start_next_batch_of_downloads(batch_id) - finally: - tasks_lock.acquire() - + batches_needing_workers.append(batch_id) + # Clean up orphaned tasks that are blocking progress if orphaned_tasks and phase == 'downloading': print(f"๐Ÿงน [Batch Healing] Found {len(orphaned_tasks)} orphaned tasks in active batch {batch_id}") + batches_needing_completion_check.append(batch_id) - # Trigger completion check to handle orphaned tasks - # Release lock temporarily to avoid deadlock - tasks_lock.release() - try: - print(f"๐Ÿ”„ [Batch Healing] Triggering completion check for batch with orphaned tasks") - _check_batch_completion_v2(batch_id) - finally: - tasks_lock.acquire() - - # Cleanup stale batches outside the iteration loop + # Cleanup stale batches inside the lock (safe - just dict mutations) for batch_id in batches_to_cleanup: task_ids_to_remove = download_batches[batch_id].get('queue', []) del download_batches[batch_id] @@ -1118,6 +1114,24 @@ def validate_and_heal_batch_states(): if healed_batches: print(f"โœ… [Batch Healing] Healed {len(healed_batches)} batches: {healed_batches}") + + # ---- All work below runs WITHOUT tasks_lock held ---- + + # Start replacement workers for healed batches + for batch_id in batches_needing_workers: + try: + print(f"๐Ÿ”„ [Batch Healing] Starting replacement workers for {batch_id}") + _start_next_batch_of_downloads(batch_id) + except Exception as e: + print(f"โŒ [Batch Healing] Error starting workers for {batch_id}: {e}") + + # Trigger completion checks for batches with orphaned tasks + for batch_id in batches_needing_completion_check: + try: + print(f"๐Ÿ”„ [Batch Healing] Triggering completion check for batch with orphaned tasks") + _check_batch_completion_v2(batch_id) + except Exception as e: + print(f"โŒ [Batch Healing] Error checking completion for {batch_id}: {e}") except Exception as healing_error: print(f"โŒ [Batch Healing] Error during validation: {healing_error}")