@ -121,6 +121,7 @@ missing_download_executor = ThreadPoolExecutor(max_workers=3, thread_name_prefix
download_tasks = { } # task_id -> task state dict
download_batches = { } # batch_id -> {queue, active_count, max_concurrent}
tasks_lock = threading . Lock ( )
batch_locks = { } # batch_id -> Lock() for atomic batch operations
# --- Automatic Wishlist Processing Infrastructure ---
# Server-side timer system for automatic wishlist processing (replaces client-side JavaScript timers)
@ -251,6 +252,9 @@ class WebUIDownloadMonitor:
for batch_id , task_id , task in tasks_to_retry :
print ( f " 🔄 Monitor triggered retry for task { task_id } " )
self . _trigger_retry ( batch_id , task_id , task )
# ENHANCED: Add worker count validation to detect ghost workers
self . _validate_worker_counts ( )
def _get_live_transfers ( self ) :
""" Get current transfer status from slskd API """
@ -550,10 +554,135 @@ class WebUIDownloadMonitor:
except Exception as e :
print ( f " ❌ Error in retry with fallback for task { task_id } : { e } " )
def _validate_worker_counts ( self ) :
"""
Validate worker counts to detect and fix ghost workers or orphaned tasks .
This prevents the modal from showing wrong worker counts permanently .
"""
try :
with tasks_lock :
for batch_id in list ( self . monitored_batches ) :
if batch_id not in download_batches :
continue
batch = download_batches [ batch_id ]
reported_active = batch [ ' active_count ' ]
max_concurrent = batch [ ' max_concurrent ' ]
queue = batch . get ( ' queue ' , [ ] )
queue_index = batch . get ( ' queue_index ' , 0 )
# Count actually active tasks based on status
actually_active = 0
orphaned_tasks = [ ]
for task_id in queue :
if task_id in download_tasks :
task_status = download_tasks [ task_id ] [ ' status ' ]
if task_status in [ ' searching ' , ' downloading ' , ' queued ' ] :
actually_active + = 1
elif task_status in [ ' failed ' , ' complete ' , ' cancelled ' ] and task_id in queue [ queue_index : ] :
# These are orphaned tasks - they're done but still in active queue
orphaned_tasks . append ( task_id )
# Check for discrepancies
if reported_active != actually_active or orphaned_tasks :
print ( f " 🔍 [Worker Validation] Batch { batch_id } : reported= { reported_active } , actual= { actually_active } , orphaned= { len ( orphaned_tasks ) } " )
if orphaned_tasks :
print ( f " 🧹 [Worker Validation] Found { len ( orphaned_tasks ) } orphaned tasks to cleanup " )
# Fix the active count if it's wrong
if reported_active != actually_active :
old_count = batch [ ' active_count ' ]
batch [ ' active_count ' ] = actually_active
print ( f " ✅ [Worker Validation] Fixed active count: { old_count } → { actually_active } " )
# If we freed up slots and have more work, try to start new workers
if actually_active < max_concurrent and queue_index < len ( queue ) :
print ( f " 🔄 [Worker Validation] Starting replacement workers " )
# Release lock temporarily to avoid deadlock
tasks_lock . release ( )
try :
_start_next_batch_of_downloads ( batch_id )
finally :
tasks_lock . acquire ( )
except Exception as validation_error :
print ( f " ❌ Error in worker count validation: { validation_error } " )
# Global download monitor instance
download_monitor = WebUIDownloadMonitor ( )
def validate_and_heal_batch_states ( ) :
"""
Periodic validation and healing of batch states to prevent permanent inconsistencies .
This is the server - side equivalent of the frontend ' s worker count validation.
"""
try :
with tasks_lock :
healed_batches = [ ]
for batch_id , batch_data in list ( download_batches . items ( ) ) :
active_count = batch_data . get ( ' active_count ' , 0 )
queue = batch_data . get ( ' queue ' , [ ] )
phase = batch_data . get ( ' phase ' , ' unknown ' )
# Count actually active tasks
actually_active = 0
orphaned_tasks = [ ]
for task_id in queue :
if task_id in download_tasks :
task_status = download_tasks [ task_id ] [ ' status ' ]
if task_status in [ ' searching ' , ' downloading ' , ' queued ' ] :
actually_active + = 1
elif task_status in [ ' failed ' , ' complete ' , ' cancelled ' ] :
orphaned_tasks . append ( task_id )
# Check for inconsistencies
if active_count != actually_active :
print ( f " 🔧 [Batch Healing] { batch_id } : fixing active count { active_count } → { actually_active } " )
batch_data [ ' active_count ' ] = actually_active
healed_batches . append ( batch_id )
# If we freed up slots, try to start more workers
if actually_active < batch_data . get ( ' max_concurrent ' , 3 ) :
queue_index = batch_data . get ( ' queue_index ' , 0 )
if queue_index < len ( queue ) :
print ( f " 🔄 [Batch Healing] Starting replacement workers for { batch_id } " )
# Release lock temporarily to avoid deadlock
tasks_lock . release ( )
try :
_start_next_batch_of_downloads ( batch_id )
finally :
tasks_lock . acquire ( )
# Clean up orphaned tasks that are blocking progress
if orphaned_tasks and phase == ' downloading ' :
print ( f " 🧹 [Batch Healing] Found { len ( orphaned_tasks ) } orphaned tasks in active batch { batch_id } " )
if healed_batches :
print ( f " ✅ [Batch Healing] Healed { len ( healed_batches ) } batches: { healed_batches } " )
except Exception as healing_error :
print ( f " ❌ [Batch Healing] Error during validation: { healing_error } " )
# Start periodic batch healing (every 30 seconds)
import threading
def start_batch_healing_timer ( ) :
""" Start periodic batch state validation and healing """
try :
validate_and_heal_batch_states ( )
except Exception as e :
print ( f " ❌ [Batch Healing Timer] Error: { e } " )
finally :
# Schedule next healing cycle
threading . Timer ( 30.0 , start_batch_healing_timer ) . start ( )
# Start the healing timer when the server starts
start_batch_healing_timer ( )
# Cleanup handler for Flask shutdown/reload
import atexit
import signal
@ -567,6 +696,11 @@ def cleanup_monitor():
download_monitor . monitored_batches . clear ( )
# Give the thread a moment to exit cleanly
time . sleep ( 0.5 )
# Clean up batch locks to prevent memory leaks
with tasks_lock :
batch_locks . clear ( )
print ( " 🧹 Cleaned up batch locks " )
def signal_handler ( signum , frame ) :
""" Handle SIGINT (Ctrl+C) and SIGTERM """
@ -4684,41 +4818,124 @@ def get_valid_candidates(results, spotify_track, query):
verified_candidates . append ( candidate )
return verified_candidates
def _start_next_batch_of_downloads ( batch_id ) :
""" Start the next batch of downloads up to the concurrent limit (like GUI) """
with tasks_lock :
if batch_id not in download_batches :
return
batch = download_batches [ batch_id ]
max_concurrent = batch [ ' max_concurrent ' ]
queue = batch [ ' queue ' ]
queue_index = batch [ ' queue_index ' ]
active_count = batch [ ' active_count ' ]
def _recover_worker_slot ( batch_id , task_id ) :
"""
Emergency worker slot recovery function for when normal completion callback fails .
This prevents permanent worker slot leaks that cause modal to show wrong worker counts .
"""
try :
print ( f " 🚨 [Worker Recovery] Attempting to recover worker slot for batch { batch_id } , task { task_id } " )
# Start downloads up to the concurrent limit
while active_count < max_concurrent and queue_index < len ( queue ) :
task_id = queue [ queue_index ]
# Acquire lock with timeout to prevent deadlock
lock_acquired = tasks_lock . acquire ( timeout = 3.0 )
if not lock_acquired :
print ( f " 💀 [Worker Recovery] FATAL: Could not acquire lock for recovery - worker slot LEAKED " )
return False
# IMPORTANT: Set status to 'searching' BEFORE starting worker (like GUI)
# Must be done INSIDE the lock to prevent race conditions with status polling
if task_id in download_tasks :
download_tasks [ task_id ] [ ' status ' ] = ' searching '
download_tasks [ task_id ] [ ' status_change_time ' ] = time . time ( )
print ( f " 🔧 [Batch Manager] Set task { task_id } status to ' searching ' " )
try :
# Verify batch still exists
if batch_id not in download_batches :
print ( f " ⚠️ [Worker Recovery] Batch { batch_id } not found - nothing to recover " )
return True
batch = download_batches [ batch_id ]
old_active = batch [ ' active_count ' ]
# Only decrement if there are active workers to prevent negative counts
if old_active > 0 :
batch [ ' active_count ' ] - = 1
new_active = batch [ ' active_count ' ]
print ( f " ✅ [Worker Recovery] Recovered worker slot - Active count: { old_active } → { new_active } " )
# Try to start next worker if queue isn't empty
if batch [ ' queue_index ' ] < len ( batch [ ' queue ' ] ) and new_active < batch [ ' max_concurrent ' ] :
print ( f " 🔄 [Worker Recovery] Attempting to start replacement worker " )
# Release lock temporarily to avoid deadlock in _start_next_batch_of_downloads
tasks_lock . release ( )
try :
_start_next_batch_of_downloads ( batch_id )
finally :
# Re-acquire lock for final cleanup
tasks_lock . acquire ( timeout = 2.0 )
return True
else :
print ( f " ⚠️ [Worker Recovery] Active count already 0 - no recovery needed " )
return True
finally :
tasks_lock . release ( )
# Update counters
download_batches [ batch_id ] [ ' active_count ' ] + = 1
download_batches [ batch_id ] [ ' queue_index ' ] + = 1
except Exception as recovery_error :
print ( f " 💀 [Worker Recovery] FATAL ERROR in recovery: { recovery_error } " )
return False
def _get_batch_lock ( batch_id ) :
""" Get or create a lock for a specific batch to prevent race conditions """
with tasks_lock :
if batch_id not in batch_locks :
batch_locks [ batch_id ] = threading . Lock ( )
return batch_locks [ batch_id ]
def _start_next_batch_of_downloads ( batch_id ) :
""" Start the next batch of downloads up to the concurrent limit (like GUI) """
# ENHANCED: Use batch-specific lock to prevent race conditions when multiple threads
# try to start workers for the same batch concurrently
batch_lock = _get_batch_lock ( batch_id )
with batch_lock :
with tasks_lock :
if batch_id not in download_batches :
return
batch = download_batches [ batch_id ]
max_concurrent = batch [ ' max_concurrent ' ]
queue = batch [ ' queue ' ]
queue_index = batch [ ' queue_index ' ]
active_count = batch [ ' active_count ' ]
print ( f " 🔄 [Batch Manager] Starting download { queue_index + 1 } / { len ( queue ) } - Active: { active_count + 1 } / { max_concurrent } " )
print ( f " 🔍 [Batch Lock] Starting workers for { batch_id } : active= { active_count } , max= { max_concurrent } , queue_pos= { queue_index } / { len ( queue ) } " )
# Submit to executor
missing_download_executor . submit ( _download_track_worker , task_id , batch_id )
# Start downloads up to the concurrent limit
while active_count < max_concurrent and queue_index < len ( queue ) :
task_id = queue [ queue_index ]
# IMPORTANT: Set status to 'searching' BEFORE starting worker (like GUI)
# Must be done INSIDE the lock to prevent race conditions with status polling
if task_id in download_tasks :
download_tasks [ task_id ] [ ' status ' ] = ' searching '
download_tasks [ task_id ] [ ' status_change_time ' ] = time . time ( )
print ( f " 🔧 [Batch Manager] Set task { task_id } status to ' searching ' " )
# CRITICAL FIX: Submit to executor BEFORE incrementing counters to prevent ghost workers
try :
# Submit to executor first - this can fail
future = missing_download_executor . submit ( _download_track_worker , task_id , batch_id )
# Only increment counters AFTER successful submit
download_batches [ batch_id ] [ ' active_count ' ] + = 1
download_batches [ batch_id ] [ ' queue_index ' ] + = 1
print ( f " 🔄 [Batch Lock] Started download { queue_index + 1 } / { len ( queue ) } - Active: { active_count + 1 } / { max_concurrent } " )
# Update local counters for next iteration
active_count + = 1
queue_index + = 1
except Exception as submit_error :
print ( f " ❌ [Batch Lock] CRITICAL: Failed to submit task { task_id } to executor: { submit_error } " )
print ( f " 🚨 [Batch Lock] Worker slot NOT consumed - preventing ghost worker " )
# Reset task status since worker never started
if task_id in download_tasks :
download_tasks [ task_id ] [ ' status ' ] = ' failed '
print ( f " 🔧 [Batch Lock] Set task { task_id } status to ' failed ' due to submit failure " )
# Don't increment counters - no worker was actually started
# This prevents the "ghost worker" issue where active_count is incremented but no actual worker runs
break # Stop trying to start more workers if executor is failing
# Update local counters for next iteration
active_count + = 1
queue_index + = 1
print ( f " ✅ [Batch Lock] Finished starting workers for { batch_id } : final_active= { download_batches [ batch_id ] [ ' active_count ' ] } , max= { max_concurrent } " )
def _get_track_artist_name ( track_info ) :
""" Extract artist name from track info, handling different data formats (replicating sync.py) """
@ -5377,21 +5594,47 @@ def _download_track_worker(task_id, batch_id=None):
if task_id in download_tasks :
download_tasks [ task_id ] [ ' status ' ] = ' failed '
# Notify batch manager that this task completed (failed)
# Notify batch manager that this task completed (failed) - THREAD SAFE
if batch_id :
_on_download_completed ( batch_id , task_id , success = False )
try :
_on_download_completed ( batch_id , task_id , success = False )
except Exception as completion_error :
print ( f " ❌ Error in batch completion callback for { task_id } : { completion_error } " )
except Exception as e :
import traceback
print ( f " ❌ CRITICAL ERROR in download task for ' { track_name } ' (task_id: { task_id } ): { e } " )
track_name_safe = locals ( ) . get ( ' track_name ' , ' unknown ' ) # Safe fallback for track_name
print ( f " ❌ CRITICAL ERROR in download task for ' { track_name_safe } ' (task_id: { task_id } ): { e } " )
traceback . print_exc ( )
with tasks_lock :
if task_id in download_tasks :
download_tasks [ task_id ] [ ' status ' ] = ' failed '
# Notify batch manager that this task completed (failed)
# Update task status safely with timeout
try :
lock_acquired = tasks_lock . acquire ( timeout = 2.0 )
if lock_acquired :
try :
if task_id in download_tasks :
download_tasks [ task_id ] [ ' status ' ] = ' failed '
print ( f " 🔧 [Exception Recovery] Set task { task_id } status to ' failed ' " )
finally :
tasks_lock . release ( )
else :
print ( f " ⚠️ [Exception Recovery] Could not acquire lock to update task { task_id } status " )
except Exception as status_error :
print ( f " ❌ Error updating task status in exception handler: { status_error } " )
# Notify batch manager that this task completed (failed) - THREAD SAFE with RECOVERY
if batch_id :
_on_download_completed ( batch_id , task_id , success = False )
try :
_on_download_completed ( batch_id , task_id , success = False )
print ( f " ✅ [Exception Recovery] Successfully freed worker slot for task { task_id } " )
except Exception as completion_error :
print ( f " ❌ [Exception Recovery] Error in batch completion callback for { task_id } : { completion_error } " )
# CRITICAL: If batch completion fails, we need to manually recover the worker slot
try :
print ( f " 🚨 [Exception Recovery] Attempting manual worker slot recovery for batch { batch_id } " )
_recover_worker_slot ( batch_id , task_id )
except Exception as recovery_error :
print ( f " 💀 [Exception Recovery] FATAL: Could not recover worker slot: { recovery_error } " )
def _attempt_download_with_candidates ( task_id , candidates , track , batch_id = None ) :
"""
@ -5709,7 +5952,9 @@ def _build_batch_status_data(batch_id, batch, live_transfers_lookup):
# Don't override tasks that are already completed/failed/cancelled
if task [ ' status ' ] not in [ ' completed ' , ' failed ' , ' cancelled ' ] :
if ' Completed ' in state_str or ' Succeeded ' in state_str :
if ' Completed ' in state_str or ' Succeeded ' in state_str :
# SYNC.PY PARITY: Mark as completed based solely on Soulseek API status
# This matches sync.py's behavior exactly (no additional file verification)
task_status [ ' status ' ] = ' completed '
# Permanently update the stored task status
task [ ' status ' ] = ' completed '
@ -5821,7 +6066,32 @@ def get_batched_download_statuses():
" timestamp " : time . time ( )
}
# ENHANCED: Add comprehensive debug info for worker tracking
debug_info = { }
for batch_id , batch_status in response [ " batches " ] . items ( ) :
if " error " not in batch_status :
active_count = batch_status . get ( " active_count " , 0 )
max_concurrent = batch_status . get ( " max_concurrent " , 3 )
task_count = len ( batch_status . get ( " tasks " , [ ] ) )
active_tasks = len ( [ t for t in batch_status . get ( " tasks " , [ ] ) if t . get ( " status " ) in [ ' searching ' , ' downloading ' , ' queued ' ] ] )
debug_info [ batch_id ] = {
" reported_active " : active_count ,
" actual_active_tasks " : active_tasks ,
" max_concurrent " : max_concurrent ,
" total_tasks " : task_count ,
" worker_discrepancy " : active_count != active_tasks
}
response [ " debug_info " ] = debug_info
print ( f " 📊 [Batched Status] Returning status for { len ( response [ ' batches ' ] ) } batches " )
# Log worker discrepancies for debugging
discrepancies = [ bid for bid , info in debug_info . items ( ) if info . get ( " worker_discrepancy " ) ]
if discrepancies :
print ( f " ⚠️ [Batched Status] Worker count discrepancies in batches: { discrepancies } " )
return jsonify ( response )
except Exception as e :
@ -5857,20 +6127,48 @@ def cancel_download_task():
# Immediately mark as cancelled to prevent race conditions
task [ ' status ' ] = ' cancelled '
# CRITICAL FIX: Only free worker slot if task is still being actively worked on
# Avoid double-decrementing active count for downloads that already started
# IMPROVED WORKER SLOT MANAGEMENT: Use batch state validation instead of task status
batch_id = task . get ( ' batch_id ' )
should_free_worker = False
worker_slot_freed = False
if batch_id :
# Only free worker slot if task is pending/searching (worker hasn't completed yet)
# Tasks with status 'downloading'/'queued' have already had their worker freed
if current_status in [ ' pending ' , ' searching ' ] :
should_free_worker = True
print ( f " 🔄 [Cancel] Task { task_id } (status: { current_status } ) - freeing worker slot for batch { batch_id } " )
_on_download_completed ( batch_id , task_id , success = False )
else :
print ( f " 🚫 [Cancel] Task { task_id } (status: { current_status } ) - worker already completed, not freeing slot " )
try :
# Check if we need to free a worker slot by examining batch state
with tasks_lock :
if batch_id in download_batches :
batch = download_batches [ batch_id ]
active_count = batch [ ' active_count ' ]
# Free worker slot if there are active workers and task was actively running
# This is more reliable than checking task status which can be inconsistent
if active_count > 0 and current_status in [ ' pending ' , ' searching ' , ' downloading ' , ' queued ' ] :
print ( f " 🔄 [Cancel] Task { task_id } (status: { current_status } ) - freeing worker slot for batch { batch_id } " )
print ( f " 🔄 [Cancel] Active count before: { active_count } " )
# Use the completion callback with error handling
_on_download_completed ( batch_id , task_id , success = False )
worker_slot_freed = True
# Verify slot was actually freed
new_active = download_batches [ batch_id ] [ ' active_count ' ]
print ( f " 🔄 [Cancel] Active count after: { new_active } " )
elif active_count == 0 :
print ( f " 🚫 [Cancel] Task { task_id } - no active workers to free " )
else :
print ( f " 🚫 [Cancel] Task { task_id } (status: { current_status } ) - not actively running, no slot to free " )
else :
print ( f " 🚫 [Cancel] Task { task_id } - batch { batch_id } not found " )
except Exception as slot_error :
print ( f " ❌ [Cancel] Error managing worker slot for { task_id } : { slot_error } " )
# Attempt emergency recovery if normal completion failed
if not worker_slot_freed :
try :
print ( f " 🚨 [Cancel] Attempting emergency worker slot recovery " )
_recover_worker_slot ( batch_id , task_id )
except Exception as recovery_error :
print ( f " 💀 [Cancel] FATAL: Emergency recovery failed: { recovery_error } " )
else :
print ( f " 🚫 [Cancel] Task { task_id } cancelled (no batch_id - likely already completed) " )