diff --git a/web_server.py b/web_server.py index 4b3a2447..fe558135 100644 --- a/web_server.py +++ b/web_server.py @@ -829,13 +829,32 @@ def validate_and_heal_batch_states(): This is the server-side equivalent of the frontend's worker count validation. """ try: + import time + current_time = time.time() + with tasks_lock: healed_batches = [] - + batches_to_cleanup = [] + for batch_id, batch_data in list(download_batches.items()): active_count = batch_data.get('active_count', 0) queue = batch_data.get('queue', []) phase = batch_data.get('phase', 'unknown') + + # AUTO-CLEANUP: Remove completed batches after 5 minutes to prevent stale state + if phase in ['complete', 'error', 'cancelled']: + # Check if batch has a completion timestamp + completion_time = batch_data.get('completion_time') + if not completion_time: + # Set completion time if not set + batch_data['completion_time'] = current_time + else: + # Check if batch has been complete for >5 minutes + time_since_completion = current_time - completion_time + if time_since_completion > 300: # 5 minutes + print(f"๐Ÿงน [Auto-Cleanup] Removing stale completed batch {batch_id} (completed {time_since_completion:.0f}s ago)") + batches_to_cleanup.append(batch_id) + continue # Skip other healing logic for this batch # Count actually active tasks actually_active = 0 @@ -844,10 +863,13 @@ def validate_and_heal_batch_states(): for task_id in queue: if task_id in download_tasks: task_status = download_tasks[task_id]['status'] - if task_status in ['searching', 'downloading', 'queued']: + if task_status in ['searching', 'downloading', 'queued', 'post_processing']: actually_active += 1 - elif task_status in ['failed', 'complete', 'cancelled']: + elif task_status in ['failed', 'completed', 'cancelled']: orphaned_tasks.append(task_id) + else: + # Task in queue but not in download_tasks dict + orphaned_tasks.append(task_id) # Check for inconsistencies if active_count != actually_active: @@ -870,7 +892,28 @@ def validate_and_heal_batch_states(): # Clean up orphaned tasks that are blocking progress if orphaned_tasks and phase == 'downloading': print(f"๐Ÿงน [Batch Healing] Found {len(orphaned_tasks)} orphaned tasks in active batch {batch_id}") - + + # Trigger completion check to handle orphaned tasks + # Release lock temporarily to avoid deadlock + tasks_lock.release() + try: + print(f"๐Ÿ”„ [Batch Healing] Triggering completion check for batch with orphaned tasks") + _check_batch_completion_v2(batch_id) + finally: + tasks_lock.acquire() + + # Cleanup stale batches outside the iteration loop + for batch_id in batches_to_cleanup: + task_ids_to_remove = download_batches[batch_id].get('queue', []) + del download_batches[batch_id] + # Clean up associated tasks + for task_id in task_ids_to_remove: + if task_id in download_tasks: + del download_tasks[task_id] + + if batches_to_cleanup: + print(f"๐Ÿ—‘๏ธ [Auto-Cleanup] Removed {len(batches_to_cleanup)} stale completed batches") + if healed_batches: print(f"โœ… [Batch Healing] Healed {len(healed_batches)} batches: {healed_batches}") @@ -7580,16 +7623,17 @@ def is_watchlist_actually_scanning(): def start_wishlist_auto_processing(): """Start automatic wishlist processing with 1-minute initial delay.""" - global wishlist_auto_timer + global wishlist_auto_timer, wishlist_next_run_time print("๐Ÿš€ [Auto-Wishlist] Initializing automatic wishlist processing...") - + with wishlist_timer_lock: # Stop any existing timer to prevent duplicates if wishlist_auto_timer is not None: wishlist_auto_timer.cancel() - + print("๐Ÿ”„ Starting automatic wishlist processing system (1 minute initial delay)") + wishlist_next_run_time = time.time() + 60.0 # Set timestamp for countdown display wishlist_auto_timer = threading.Timer(60.0, _process_wishlist_automatically) # 1 minute wishlist_auto_timer.daemon = True wishlist_auto_timer.start() @@ -7597,7 +7641,7 @@ def start_wishlist_auto_processing(): def stop_wishlist_auto_processing(): """Stop automatic wishlist processing and cleanup timer.""" - global wishlist_auto_timer, wishlist_auto_processing, wishlist_auto_processing_timestamp + global wishlist_auto_timer, wishlist_auto_processing, wishlist_auto_processing_timestamp, wishlist_next_run_time with wishlist_timer_lock: if wishlist_auto_timer is not None: @@ -7607,6 +7651,7 @@ def stop_wishlist_auto_processing(): wishlist_auto_processing = False wishlist_auto_processing_timestamp = 0 + wishlist_next_run_time = 0 # Clear countdown timer def schedule_next_wishlist_processing(): """Schedule next automatic wishlist processing in 30 minutes.""" @@ -9741,9 +9786,10 @@ def _process_failed_tracks_to_wishlist_exact(batch_id): with tasks_lock: if batch_id in download_batches: download_batches[batch_id]['phase'] = 'complete' + download_batches[batch_id]['completion_time'] = time.time() # Track for auto-cleanup download_batches[batch_id]['wishlist_summary'] = { - 'tracks_added': 0, - 'errors': 1, + 'tracks_added': 0, + 'errors': 1, 'total_failed': 0, 'error_message': str(e) } @@ -9913,18 +9959,41 @@ def _on_download_completed(batch_id, task_id, success=True): no_active_workers = batch['active_count'] == 0 # Count actually finished tasks (completed, failed, or cancelled) - # CRITICAL: Don't include 'post_processing' as finished - it's still in progress! - # CRITICAL: Don't include 'searching' as finished - task is being retried! + # CRITICAL: Don't include 'post_processing' as finished - it's still in progress (unless stuck)! + # CRITICAL: Don't include 'searching' as finished - task is being retried (unless stuck)! finished_count = 0 retrying_count = 0 queue = batch.get('queue', []) + current_time = time.time() for task_id in queue: if task_id in download_tasks: - task_status = download_tasks[task_id]['status'] - if task_status in ['completed', 'failed', 'cancelled']: + task = download_tasks[task_id] + task_status = task['status'] + + # STUCK DETECTION: Force fail tasks that have been in transitional states too long + if task_status == 'searching': + task_age = current_time - task.get('status_change_time', current_time) + if task_age > 600: # 10 minutes + print(f"โฐ [Stuck Detection] Task {task_id} stuck in searching for {task_age:.0f}s - forcing failure") + task['status'] = 'failed' + task['error_message'] = f'Retry timeout after {task_age:.0f} seconds' + finished_count += 1 + else: + retrying_count += 1 + elif task_status == 'post_processing': + task_age = current_time - task.get('status_change_time', current_time) + if task_age > 300: # 5 minutes (post-processing should be fast) + print(f"โฐ [Stuck Detection] Task {task_id} stuck in post_processing for {task_age:.0f}s - forcing completion") + task['status'] = 'completed' # Assume it worked if file verification is taking too long + finished_count += 1 + else: + retrying_count += 1 + elif task_status in ['completed', 'failed', 'cancelled']: finished_count += 1 - elif task_status == 'searching': - retrying_count += 1 + else: + # Task ID in queue but not in download_tasks - treat as completed to prevent blocking + print(f"โš ๏ธ [Orphaned Task] Task {task_id} in queue but not in download_tasks - counting as finished") + finished_count += 1 all_tasks_truly_finished = finished_count >= len(queue) has_retrying_tasks = retrying_count > 0 @@ -9944,8 +10013,9 @@ def _on_download_completed(batch_id, task_id, success=True): # FIXED: Ensure batch is not already marked as complete to prevent duplicate processing if batch.get('phase') != 'complete': - # Mark batch as complete and process wishlist outside of lock to prevent deadlocks + # Mark batch as complete and set completion timestamp for auto-cleanup batch['phase'] = 'complete' + batch['completion_time'] = time.time() # Track when batch completed # Add activity for batch completion playlist_name = batch.get('playlist_name', 'Unknown Playlist') @@ -10068,6 +10138,7 @@ def _run_full_missing_tracks_process(batch_id, playlist_id, tracks_json): if batch_id in download_batches: is_auto_batch = download_batches[batch_id].get('auto_initiated', False) download_batches[batch_id]['phase'] = 'complete' + download_batches[batch_id]['completion_time'] = time.time() # Track for auto-cleanup # Update YouTube playlist phase to 'download_complete' if this is a YouTube playlist if playlist_id.startswith('youtube_'): @@ -11721,14 +11792,37 @@ def _check_batch_completion_v2(batch_id): finished_count = 0 retrying_count = 0 queue = batch.get('queue', []) - + current_time = time.time() + for task_id in queue: if task_id in download_tasks: - task_status = download_tasks[task_id]['status'] - if task_status in ['completed', 'failed', 'cancelled']: + task = download_tasks[task_id] + task_status = task['status'] + + # STUCK DETECTION: Force fail tasks that have been in transitional states too long + if task_status == 'searching': + task_age = current_time - task.get('status_change_time', current_time) + if task_age > 600: # 10 minutes + print(f"โฐ [Stuck Detection V2] Task {task_id} stuck in searching for {task_age:.0f}s - forcing failure") + task['status'] = 'failed' + task['error_message'] = f'Retry timeout after {task_age:.0f} seconds' + finished_count += 1 + else: + retrying_count += 1 + elif task_status == 'post_processing': + task_age = current_time - task.get('status_change_time', current_time) + if task_age > 300: # 5 minutes (post-processing should be fast) + print(f"โฐ [Stuck Detection V2] Task {task_id} stuck in post_processing for {task_age:.0f}s - forcing completion") + task['status'] = 'completed' # Assume it worked if file verification is taking too long + finished_count += 1 + else: + retrying_count += 1 + elif task_status in ['completed', 'failed', 'cancelled']: finished_count += 1 - elif task_status == 'searching': - retrying_count += 1 + else: + # Task ID in queue but not in download_tasks - treat as completed to prevent blocking + print(f"โš ๏ธ [Orphaned Task V2] Task {task_id} in queue but not in download_tasks - counting as finished") + finished_count += 1 all_tasks_truly_finished = finished_count >= len(queue) has_retrying_tasks = retrying_count > 0 @@ -11739,12 +11833,13 @@ def _check_batch_completion_v2(batch_id): # FIXED: Ensure batch is not already marked as complete to prevent duplicate processing if batch.get('phase') != 'complete': print(f"๐ŸŽ‰ [Completion Check V2] Batch {batch_id} is complete - marking as finished") - + # Check if this is an auto-initiated batch is_auto_batch = batch.get('auto_initiated', False) - - # Mark batch as complete + + # Mark batch as complete and set completion timestamp for auto-cleanup batch['phase'] = 'complete' + batch['completion_time'] = time.time() # Track when batch completed else: print(f"โœ… [Completion Check V2] Batch {batch_id} already marked complete - skipping duplicate processing") return True # Already complete