Add auto-cleanup and stuck task detection for batches

Implements automatic cleanup of completed, errored, or cancelled batches after 5 minutes to prevent stale state. Adds stuck task detection for 'searching' and 'post_processing' states, forcing failure or completion if tasks exceed time thresholds. Updates batch completion logic to track completion timestamps and handle orphaned tasks more robustly. Also improves wishlist auto-processing timer management.
pull/97/head
Broque Thomas 5 months ago
parent 349d653ca0
commit eae2a9c135

@ -829,13 +829,32 @@ def validate_and_heal_batch_states():
This is the server-side equivalent of the frontend's worker count validation.
"""
try:
import time
current_time = time.time()
with tasks_lock:
healed_batches = []
batches_to_cleanup = []
for batch_id, batch_data in list(download_batches.items()):
active_count = batch_data.get('active_count', 0)
queue = batch_data.get('queue', [])
phase = batch_data.get('phase', 'unknown')
# AUTO-CLEANUP: Remove completed batches after 5 minutes to prevent stale state
if phase in ['complete', 'error', 'cancelled']:
# Check if batch has a completion timestamp
completion_time = batch_data.get('completion_time')
if not completion_time:
# Set completion time if not set
batch_data['completion_time'] = current_time
else:
# Check if batch has been complete for >5 minutes
time_since_completion = current_time - completion_time
if time_since_completion > 300: # 5 minutes
print(f"🧹 [Auto-Cleanup] Removing stale completed batch {batch_id} (completed {time_since_completion:.0f}s ago)")
batches_to_cleanup.append(batch_id)
continue # Skip other healing logic for this batch
# Count actually active tasks
actually_active = 0
@ -844,10 +863,13 @@ def validate_and_heal_batch_states():
for task_id in queue:
if task_id in download_tasks:
task_status = download_tasks[task_id]['status']
if task_status in ['searching', 'downloading', 'queued']:
if task_status in ['searching', 'downloading', 'queued', 'post_processing']:
actually_active += 1
elif task_status in ['failed', 'complete', 'cancelled']:
elif task_status in ['failed', 'completed', 'cancelled']:
orphaned_tasks.append(task_id)
else:
# Task in queue but not in download_tasks dict
orphaned_tasks.append(task_id)
# Check for inconsistencies
if active_count != actually_active:
@ -870,7 +892,28 @@ def validate_and_heal_batch_states():
# Clean up orphaned tasks that are blocking progress
if orphaned_tasks and phase == 'downloading':
print(f"🧹 [Batch Healing] Found {len(orphaned_tasks)} orphaned tasks in active batch {batch_id}")
# Trigger completion check to handle orphaned tasks
# Release lock temporarily to avoid deadlock
tasks_lock.release()
try:
print(f"🔄 [Batch Healing] Triggering completion check for batch with orphaned tasks")
_check_batch_completion_v2(batch_id)
finally:
tasks_lock.acquire()
# Cleanup stale batches outside the iteration loop
for batch_id in batches_to_cleanup:
task_ids_to_remove = download_batches[batch_id].get('queue', [])
del download_batches[batch_id]
# Clean up associated tasks
for task_id in task_ids_to_remove:
if task_id in download_tasks:
del download_tasks[task_id]
if batches_to_cleanup:
print(f"🗑️ [Auto-Cleanup] Removed {len(batches_to_cleanup)} stale completed batches")
if healed_batches:
print(f"✅ [Batch Healing] Healed {len(healed_batches)} batches: {healed_batches}")
@ -7580,16 +7623,17 @@ def is_watchlist_actually_scanning():
def start_wishlist_auto_processing():
"""Start automatic wishlist processing with 1-minute initial delay."""
global wishlist_auto_timer
global wishlist_auto_timer, wishlist_next_run_time
print("🚀 [Auto-Wishlist] Initializing automatic wishlist processing...")
with wishlist_timer_lock:
# Stop any existing timer to prevent duplicates
if wishlist_auto_timer is not None:
wishlist_auto_timer.cancel()
print("🔄 Starting automatic wishlist processing system (1 minute initial delay)")
wishlist_next_run_time = time.time() + 60.0 # Set timestamp for countdown display
wishlist_auto_timer = threading.Timer(60.0, _process_wishlist_automatically) # 1 minute
wishlist_auto_timer.daemon = True
wishlist_auto_timer.start()
@ -7597,7 +7641,7 @@ def start_wishlist_auto_processing():
def stop_wishlist_auto_processing():
"""Stop automatic wishlist processing and cleanup timer."""
global wishlist_auto_timer, wishlist_auto_processing, wishlist_auto_processing_timestamp
global wishlist_auto_timer, wishlist_auto_processing, wishlist_auto_processing_timestamp, wishlist_next_run_time
with wishlist_timer_lock:
if wishlist_auto_timer is not None:
@ -7607,6 +7651,7 @@ def stop_wishlist_auto_processing():
wishlist_auto_processing = False
wishlist_auto_processing_timestamp = 0
wishlist_next_run_time = 0 # Clear countdown timer
def schedule_next_wishlist_processing():
"""Schedule next automatic wishlist processing in 30 minutes."""
@ -9741,9 +9786,10 @@ def _process_failed_tracks_to_wishlist_exact(batch_id):
with tasks_lock:
if batch_id in download_batches:
download_batches[batch_id]['phase'] = 'complete'
download_batches[batch_id]['completion_time'] = time.time() # Track for auto-cleanup
download_batches[batch_id]['wishlist_summary'] = {
'tracks_added': 0,
'errors': 1,
'tracks_added': 0,
'errors': 1,
'total_failed': 0,
'error_message': str(e)
}
@ -9913,18 +9959,41 @@ def _on_download_completed(batch_id, task_id, success=True):
no_active_workers = batch['active_count'] == 0
# Count actually finished tasks (completed, failed, or cancelled)
# CRITICAL: Don't include 'post_processing' as finished - it's still in progress!
# CRITICAL: Don't include 'searching' as finished - task is being retried!
# CRITICAL: Don't include 'post_processing' as finished - it's still in progress (unless stuck)!
# CRITICAL: Don't include 'searching' as finished - task is being retried (unless stuck)!
finished_count = 0
retrying_count = 0
queue = batch.get('queue', [])
current_time = time.time()
for task_id in queue:
if task_id in download_tasks:
task_status = download_tasks[task_id]['status']
if task_status in ['completed', 'failed', 'cancelled']:
task = download_tasks[task_id]
task_status = task['status']
# STUCK DETECTION: Force fail tasks that have been in transitional states too long
if task_status == 'searching':
task_age = current_time - task.get('status_change_time', current_time)
if task_age > 600: # 10 minutes
print(f"⏰ [Stuck Detection] Task {task_id} stuck in searching for {task_age:.0f}s - forcing failure")
task['status'] = 'failed'
task['error_message'] = f'Retry timeout after {task_age:.0f} seconds'
finished_count += 1
else:
retrying_count += 1
elif task_status == 'post_processing':
task_age = current_time - task.get('status_change_time', current_time)
if task_age > 300: # 5 minutes (post-processing should be fast)
print(f"⏰ [Stuck Detection] Task {task_id} stuck in post_processing for {task_age:.0f}s - forcing completion")
task['status'] = 'completed' # Assume it worked if file verification is taking too long
finished_count += 1
else:
retrying_count += 1
elif task_status in ['completed', 'failed', 'cancelled']:
finished_count += 1
elif task_status == 'searching':
retrying_count += 1
else:
# Task ID in queue but not in download_tasks - treat as completed to prevent blocking
print(f"⚠️ [Orphaned Task] Task {task_id} in queue but not in download_tasks - counting as finished")
finished_count += 1
all_tasks_truly_finished = finished_count >= len(queue)
has_retrying_tasks = retrying_count > 0
@ -9944,8 +10013,9 @@ def _on_download_completed(batch_id, task_id, success=True):
# FIXED: Ensure batch is not already marked as complete to prevent duplicate processing
if batch.get('phase') != 'complete':
# Mark batch as complete and process wishlist outside of lock to prevent deadlocks
# Mark batch as complete and set completion timestamp for auto-cleanup
batch['phase'] = 'complete'
batch['completion_time'] = time.time() # Track when batch completed
# Add activity for batch completion
playlist_name = batch.get('playlist_name', 'Unknown Playlist')
@ -10068,6 +10138,7 @@ def _run_full_missing_tracks_process(batch_id, playlist_id, tracks_json):
if batch_id in download_batches:
is_auto_batch = download_batches[batch_id].get('auto_initiated', False)
download_batches[batch_id]['phase'] = 'complete'
download_batches[batch_id]['completion_time'] = time.time() # Track for auto-cleanup
# Update YouTube playlist phase to 'download_complete' if this is a YouTube playlist
if playlist_id.startswith('youtube_'):
@ -11721,14 +11792,37 @@ def _check_batch_completion_v2(batch_id):
finished_count = 0
retrying_count = 0
queue = batch.get('queue', [])
current_time = time.time()
for task_id in queue:
if task_id in download_tasks:
task_status = download_tasks[task_id]['status']
if task_status in ['completed', 'failed', 'cancelled']:
task = download_tasks[task_id]
task_status = task['status']
# STUCK DETECTION: Force fail tasks that have been in transitional states too long
if task_status == 'searching':
task_age = current_time - task.get('status_change_time', current_time)
if task_age > 600: # 10 minutes
print(f"⏰ [Stuck Detection V2] Task {task_id} stuck in searching for {task_age:.0f}s - forcing failure")
task['status'] = 'failed'
task['error_message'] = f'Retry timeout after {task_age:.0f} seconds'
finished_count += 1
else:
retrying_count += 1
elif task_status == 'post_processing':
task_age = current_time - task.get('status_change_time', current_time)
if task_age > 300: # 5 minutes (post-processing should be fast)
print(f"⏰ [Stuck Detection V2] Task {task_id} stuck in post_processing for {task_age:.0f}s - forcing completion")
task['status'] = 'completed' # Assume it worked if file verification is taking too long
finished_count += 1
else:
retrying_count += 1
elif task_status in ['completed', 'failed', 'cancelled']:
finished_count += 1
elif task_status == 'searching':
retrying_count += 1
else:
# Task ID in queue but not in download_tasks - treat as completed to prevent blocking
print(f"⚠️ [Orphaned Task V2] Task {task_id} in queue but not in download_tasks - counting as finished")
finished_count += 1
all_tasks_truly_finished = finished_count >= len(queue)
has_retrying_tasks = retrying_count > 0
@ -11739,12 +11833,13 @@ def _check_batch_completion_v2(batch_id):
# FIXED: Ensure batch is not already marked as complete to prevent duplicate processing
if batch.get('phase') != 'complete':
print(f"🎉 [Completion Check V2] Batch {batch_id} is complete - marking as finished")
# Check if this is an auto-initiated batch
is_auto_batch = batch.get('auto_initiated', False)
# Mark batch as complete
# Mark batch as complete and set completion timestamp for auto-cleanup
batch['phase'] = 'complete'
batch['completion_time'] = time.time() # Track when batch completed
else:
print(f"✅ [Completion Check V2] Batch {batch_id} already marked complete - skipping duplicate processing")
return True # Already complete

Loading…
Cancel
Save