From d8d12bcc2cf7632b91957965ad6d352da24e8f4d Mon Sep 17 00:00:00 2001 From: Broque Thomas Date: Sat, 6 Sep 2025 09:25:15 -0700 Subject: [PATCH] Improve download worker management and UI polling Adds server-side validation and healing for download batch worker counts to prevent ghost workers and orphaned tasks, including periodic batch state checks and recovery logic. Enhances thread safety with batch-specific locks and improves error handling in worker slot management. On the frontend, implements exponential backoff for polling failures, validates server responses, and logs worker count discrepancies for better debugging and UI consistency. --- web_server.py | 396 ++++++++++++++++++++++++++++++++++++----- webui/static/script.js | 150 +++++++++++++++- 2 files changed, 494 insertions(+), 52 deletions(-) diff --git a/web_server.py b/web_server.py index 3983cf85..acf123b9 100644 --- a/web_server.py +++ b/web_server.py @@ -121,6 +121,7 @@ missing_download_executor = ThreadPoolExecutor(max_workers=3, thread_name_prefix download_tasks = {} # task_id -> task state dict download_batches = {} # batch_id -> {queue, active_count, max_concurrent} tasks_lock = threading.Lock() +batch_locks = {} # batch_id -> Lock() for atomic batch operations # --- Automatic Wishlist Processing Infrastructure --- # Server-side timer system for automatic wishlist processing (replaces client-side JavaScript timers) @@ -251,6 +252,9 @@ class WebUIDownloadMonitor: for batch_id, task_id, task in tasks_to_retry: print(f"๐Ÿ”„ Monitor triggered retry for task {task_id}") self._trigger_retry(batch_id, task_id, task) + + # ENHANCED: Add worker count validation to detect ghost workers + self._validate_worker_counts() def _get_live_transfers(self): """Get current transfer status from slskd API""" @@ -550,10 +554,135 @@ class WebUIDownloadMonitor: except Exception as e: print(f"โŒ Error in retry with fallback for task {task_id}: {e}") + + def _validate_worker_counts(self): + """ + Validate worker counts to detect and fix ghost workers or orphaned tasks. + This prevents the modal from showing wrong worker counts permanently. + """ + try: + with tasks_lock: + for batch_id in list(self.monitored_batches): + if batch_id not in download_batches: + continue + + batch = download_batches[batch_id] + reported_active = batch['active_count'] + max_concurrent = batch['max_concurrent'] + queue = batch.get('queue', []) + queue_index = batch.get('queue_index', 0) + + # Count actually active tasks based on status + actually_active = 0 + orphaned_tasks = [] + + for task_id in queue: + if task_id in download_tasks: + task_status = download_tasks[task_id]['status'] + if task_status in ['searching', 'downloading', 'queued']: + actually_active += 1 + elif task_status in ['failed', 'complete', 'cancelled'] and task_id in queue[queue_index:]: + # These are orphaned tasks - they're done but still in active queue + orphaned_tasks.append(task_id) + + # Check for discrepancies + if reported_active != actually_active or orphaned_tasks: + print(f"๐Ÿ” [Worker Validation] Batch {batch_id}: reported={reported_active}, actual={actually_active}, orphaned={len(orphaned_tasks)}") + + if orphaned_tasks: + print(f"๐Ÿงน [Worker Validation] Found {len(orphaned_tasks)} orphaned tasks to cleanup") + + # Fix the active count if it's wrong + if reported_active != actually_active: + old_count = batch['active_count'] + batch['active_count'] = actually_active + print(f"โœ… [Worker Validation] Fixed active count: {old_count} โ†’ {actually_active}") + + # If we freed up slots and have more work, try to start new workers + if actually_active < max_concurrent and queue_index < len(queue): + print(f"๐Ÿ”„ [Worker Validation] Starting replacement workers") + # Release lock temporarily to avoid deadlock + tasks_lock.release() + try: + _start_next_batch_of_downloads(batch_id) + finally: + tasks_lock.acquire() + + except Exception as validation_error: + print(f"โŒ Error in worker count validation: {validation_error}") # Global download monitor instance download_monitor = WebUIDownloadMonitor() +def validate_and_heal_batch_states(): + """ + Periodic validation and healing of batch states to prevent permanent inconsistencies. + This is the server-side equivalent of the frontend's worker count validation. + """ + try: + with tasks_lock: + healed_batches = [] + + for batch_id, batch_data in list(download_batches.items()): + active_count = batch_data.get('active_count', 0) + queue = batch_data.get('queue', []) + phase = batch_data.get('phase', 'unknown') + + # Count actually active tasks + actually_active = 0 + orphaned_tasks = [] + + for task_id in queue: + if task_id in download_tasks: + task_status = download_tasks[task_id]['status'] + if task_status in ['searching', 'downloading', 'queued']: + actually_active += 1 + elif task_status in ['failed', 'complete', 'cancelled']: + orphaned_tasks.append(task_id) + + # Check for inconsistencies + if active_count != actually_active: + print(f"๐Ÿ”ง [Batch Healing] {batch_id}: fixing active count {active_count} โ†’ {actually_active}") + batch_data['active_count'] = actually_active + healed_batches.append(batch_id) + + # If we freed up slots, try to start more workers + if actually_active < batch_data.get('max_concurrent', 3): + queue_index = batch_data.get('queue_index', 0) + if queue_index < len(queue): + print(f"๐Ÿ”„ [Batch Healing] Starting replacement workers for {batch_id}") + # Release lock temporarily to avoid deadlock + tasks_lock.release() + try: + _start_next_batch_of_downloads(batch_id) + finally: + tasks_lock.acquire() + + # Clean up orphaned tasks that are blocking progress + if orphaned_tasks and phase == 'downloading': + print(f"๐Ÿงน [Batch Healing] Found {len(orphaned_tasks)} orphaned tasks in active batch {batch_id}") + + if healed_batches: + print(f"โœ… [Batch Healing] Healed {len(healed_batches)} batches: {healed_batches}") + + except Exception as healing_error: + print(f"โŒ [Batch Healing] Error during validation: {healing_error}") + +# Start periodic batch healing (every 30 seconds) +import threading +def start_batch_healing_timer(): + """Start periodic batch state validation and healing""" + try: + validate_and_heal_batch_states() + except Exception as e: + print(f"โŒ [Batch Healing Timer] Error: {e}") + finally: + # Schedule next healing cycle + threading.Timer(30.0, start_batch_healing_timer).start() + +# Start the healing timer when the server starts +start_batch_healing_timer() + # Cleanup handler for Flask shutdown/reload import atexit import signal @@ -567,6 +696,11 @@ def cleanup_monitor(): download_monitor.monitored_batches.clear() # Give the thread a moment to exit cleanly time.sleep(0.5) + + # Clean up batch locks to prevent memory leaks + with tasks_lock: + batch_locks.clear() + print("๐Ÿงน Cleaned up batch locks") def signal_handler(signum, frame): """Handle SIGINT (Ctrl+C) and SIGTERM""" @@ -4684,41 +4818,124 @@ def get_valid_candidates(results, spotify_track, query): verified_candidates.append(candidate) return verified_candidates -def _start_next_batch_of_downloads(batch_id): - """Start the next batch of downloads up to the concurrent limit (like GUI)""" - with tasks_lock: - if batch_id not in download_batches: - return - - batch = download_batches[batch_id] - max_concurrent = batch['max_concurrent'] - queue = batch['queue'] - queue_index = batch['queue_index'] - active_count = batch['active_count'] +def _recover_worker_slot(batch_id, task_id): + """ + Emergency worker slot recovery function for when normal completion callback fails. + This prevents permanent worker slot leaks that cause modal to show wrong worker counts. + """ + try: + print(f"๐Ÿšจ [Worker Recovery] Attempting to recover worker slot for batch {batch_id}, task {task_id}") - # Start downloads up to the concurrent limit - while active_count < max_concurrent and queue_index < len(queue): - task_id = queue[queue_index] + # Acquire lock with timeout to prevent deadlock + lock_acquired = tasks_lock.acquire(timeout=3.0) + if not lock_acquired: + print(f"๐Ÿ’€ [Worker Recovery] FATAL: Could not acquire lock for recovery - worker slot LEAKED") + return False - # IMPORTANT: Set status to 'searching' BEFORE starting worker (like GUI) - # Must be done INSIDE the lock to prevent race conditions with status polling - if task_id in download_tasks: - download_tasks[task_id]['status'] = 'searching' - download_tasks[task_id]['status_change_time'] = time.time() - print(f"๐Ÿ”ง [Batch Manager] Set task {task_id} status to 'searching'") + try: + # Verify batch still exists + if batch_id not in download_batches: + print(f"โš ๏ธ [Worker Recovery] Batch {batch_id} not found - nothing to recover") + return True + + batch = download_batches[batch_id] + old_active = batch['active_count'] + + # Only decrement if there are active workers to prevent negative counts + if old_active > 0: + batch['active_count'] -= 1 + new_active = batch['active_count'] + print(f"โœ… [Worker Recovery] Recovered worker slot - Active count: {old_active} โ†’ {new_active}") + + # Try to start next worker if queue isn't empty + if batch['queue_index'] < len(batch['queue']) and new_active < batch['max_concurrent']: + print(f"๐Ÿ”„ [Worker Recovery] Attempting to start replacement worker") + # Release lock temporarily to avoid deadlock in _start_next_batch_of_downloads + tasks_lock.release() + try: + _start_next_batch_of_downloads(batch_id) + finally: + # Re-acquire lock for final cleanup + tasks_lock.acquire(timeout=2.0) + + return True + else: + print(f"โš ๏ธ [Worker Recovery] Active count already 0 - no recovery needed") + return True + + finally: + tasks_lock.release() - # Update counters - download_batches[batch_id]['active_count'] += 1 - download_batches[batch_id]['queue_index'] += 1 + except Exception as recovery_error: + print(f"๐Ÿ’€ [Worker Recovery] FATAL ERROR in recovery: {recovery_error}") + return False + +def _get_batch_lock(batch_id): + """Get or create a lock for a specific batch to prevent race conditions""" + with tasks_lock: + if batch_id not in batch_locks: + batch_locks[batch_id] = threading.Lock() + return batch_locks[batch_id] + +def _start_next_batch_of_downloads(batch_id): + """Start the next batch of downloads up to the concurrent limit (like GUI)""" + # ENHANCED: Use batch-specific lock to prevent race conditions when multiple threads + # try to start workers for the same batch concurrently + batch_lock = _get_batch_lock(batch_id) + + with batch_lock: + with tasks_lock: + if batch_id not in download_batches: + return + + batch = download_batches[batch_id] + max_concurrent = batch['max_concurrent'] + queue = batch['queue'] + queue_index = batch['queue_index'] + active_count = batch['active_count'] - print(f"๐Ÿ”„ [Batch Manager] Starting download {queue_index + 1}/{len(queue)} - Active: {active_count + 1}/{max_concurrent}") + print(f"๐Ÿ” [Batch Lock] Starting workers for {batch_id}: active={active_count}, max={max_concurrent}, queue_pos={queue_index}/{len(queue)}") - # Submit to executor - missing_download_executor.submit(_download_track_worker, task_id, batch_id) + # Start downloads up to the concurrent limit + while active_count < max_concurrent and queue_index < len(queue): + task_id = queue[queue_index] + + # IMPORTANT: Set status to 'searching' BEFORE starting worker (like GUI) + # Must be done INSIDE the lock to prevent race conditions with status polling + if task_id in download_tasks: + download_tasks[task_id]['status'] = 'searching' + download_tasks[task_id]['status_change_time'] = time.time() + print(f"๐Ÿ”ง [Batch Manager] Set task {task_id} status to 'searching'") + + # CRITICAL FIX: Submit to executor BEFORE incrementing counters to prevent ghost workers + try: + # Submit to executor first - this can fail + future = missing_download_executor.submit(_download_track_worker, task_id, batch_id) + + # Only increment counters AFTER successful submit + download_batches[batch_id]['active_count'] += 1 + download_batches[batch_id]['queue_index'] += 1 + + print(f"๐Ÿ”„ [Batch Lock] Started download {queue_index + 1}/{len(queue)} - Active: {active_count + 1}/{max_concurrent}") + + # Update local counters for next iteration + active_count += 1 + queue_index += 1 + + except Exception as submit_error: + print(f"โŒ [Batch Lock] CRITICAL: Failed to submit task {task_id} to executor: {submit_error}") + print(f"๐Ÿšจ [Batch Lock] Worker slot NOT consumed - preventing ghost worker") + + # Reset task status since worker never started + if task_id in download_tasks: + download_tasks[task_id]['status'] = 'failed' + print(f"๐Ÿ”ง [Batch Lock] Set task {task_id} status to 'failed' due to submit failure") + + # Don't increment counters - no worker was actually started + # This prevents the "ghost worker" issue where active_count is incremented but no actual worker runs + break # Stop trying to start more workers if executor is failing - # Update local counters for next iteration - active_count += 1 - queue_index += 1 + print(f"โœ… [Batch Lock] Finished starting workers for {batch_id}: final_active={download_batches[batch_id]['active_count']}, max={max_concurrent}") def _get_track_artist_name(track_info): """Extract artist name from track info, handling different data formats (replicating sync.py)""" @@ -5377,21 +5594,47 @@ def _download_track_worker(task_id, batch_id=None): if task_id in download_tasks: download_tasks[task_id]['status'] = 'failed' - # Notify batch manager that this task completed (failed) + # Notify batch manager that this task completed (failed) - THREAD SAFE if batch_id: - _on_download_completed(batch_id, task_id, success=False) + try: + _on_download_completed(batch_id, task_id, success=False) + except Exception as completion_error: + print(f"โŒ Error in batch completion callback for {task_id}: {completion_error}") except Exception as e: import traceback - print(f"โŒ CRITICAL ERROR in download task for '{track_name}' (task_id: {task_id}): {e}") + track_name_safe = locals().get('track_name', 'unknown') # Safe fallback for track_name + print(f"โŒ CRITICAL ERROR in download task for '{track_name_safe}' (task_id: {task_id}): {e}") traceback.print_exc() - with tasks_lock: - if task_id in download_tasks: - download_tasks[task_id]['status'] = 'failed' - # Notify batch manager that this task completed (failed) + # Update task status safely with timeout + try: + lock_acquired = tasks_lock.acquire(timeout=2.0) + if lock_acquired: + try: + if task_id in download_tasks: + download_tasks[task_id]['status'] = 'failed' + print(f"๐Ÿ”ง [Exception Recovery] Set task {task_id} status to 'failed'") + finally: + tasks_lock.release() + else: + print(f"โš ๏ธ [Exception Recovery] Could not acquire lock to update task {task_id} status") + except Exception as status_error: + print(f"โŒ Error updating task status in exception handler: {status_error}") + + # Notify batch manager that this task completed (failed) - THREAD SAFE with RECOVERY if batch_id: - _on_download_completed(batch_id, task_id, success=False) + try: + _on_download_completed(batch_id, task_id, success=False) + print(f"โœ… [Exception Recovery] Successfully freed worker slot for task {task_id}") + except Exception as completion_error: + print(f"โŒ [Exception Recovery] Error in batch completion callback for {task_id}: {completion_error}") + # CRITICAL: If batch completion fails, we need to manually recover the worker slot + try: + print(f"๐Ÿšจ [Exception Recovery] Attempting manual worker slot recovery for batch {batch_id}") + _recover_worker_slot(batch_id, task_id) + except Exception as recovery_error: + print(f"๐Ÿ’€ [Exception Recovery] FATAL: Could not recover worker slot: {recovery_error}") def _attempt_download_with_candidates(task_id, candidates, track, batch_id=None): """ @@ -5709,7 +5952,9 @@ def _build_batch_status_data(batch_id, batch, live_transfers_lookup): # Don't override tasks that are already completed/failed/cancelled if task['status'] not in ['completed', 'failed', 'cancelled']: - if 'Completed' in state_str or 'Succeeded' in state_str: + if 'Completed' in state_str or 'Succeeded' in state_str: + # SYNC.PY PARITY: Mark as completed based solely on Soulseek API status + # This matches sync.py's behavior exactly (no additional file verification) task_status['status'] = 'completed' # Permanently update the stored task status task['status'] = 'completed' @@ -5821,7 +6066,32 @@ def get_batched_download_statuses(): "timestamp": time.time() } + # ENHANCED: Add comprehensive debug info for worker tracking + debug_info = {} + for batch_id, batch_status in response["batches"].items(): + if "error" not in batch_status: + active_count = batch_status.get("active_count", 0) + max_concurrent = batch_status.get("max_concurrent", 3) + task_count = len(batch_status.get("tasks", [])) + active_tasks = len([t for t in batch_status.get("tasks", []) if t.get("status") in ['searching', 'downloading', 'queued']]) + + debug_info[batch_id] = { + "reported_active": active_count, + "actual_active_tasks": active_tasks, + "max_concurrent": max_concurrent, + "total_tasks": task_count, + "worker_discrepancy": active_count != active_tasks + } + + response["debug_info"] = debug_info + print(f"๐Ÿ“Š [Batched Status] Returning status for {len(response['batches'])} batches") + + # Log worker discrepancies for debugging + discrepancies = [bid for bid, info in debug_info.items() if info.get("worker_discrepancy")] + if discrepancies: + print(f"โš ๏ธ [Batched Status] Worker count discrepancies in batches: {discrepancies}") + return jsonify(response) except Exception as e: @@ -5857,20 +6127,48 @@ def cancel_download_task(): # Immediately mark as cancelled to prevent race conditions task['status'] = 'cancelled' - # CRITICAL FIX: Only free worker slot if task is still being actively worked on - # Avoid double-decrementing active count for downloads that already started + # IMPROVED WORKER SLOT MANAGEMENT: Use batch state validation instead of task status batch_id = task.get('batch_id') - should_free_worker = False + worker_slot_freed = False if batch_id: - # Only free worker slot if task is pending/searching (worker hasn't completed yet) - # Tasks with status 'downloading'/'queued' have already had their worker freed - if current_status in ['pending', 'searching']: - should_free_worker = True - print(f"๐Ÿ”„ [Cancel] Task {task_id} (status: {current_status}) - freeing worker slot for batch {batch_id}") - _on_download_completed(batch_id, task_id, success=False) - else: - print(f"๐Ÿšซ [Cancel] Task {task_id} (status: {current_status}) - worker already completed, not freeing slot") + try: + # Check if we need to free a worker slot by examining batch state + with tasks_lock: + if batch_id in download_batches: + batch = download_batches[batch_id] + active_count = batch['active_count'] + + # Free worker slot if there are active workers and task was actively running + # This is more reliable than checking task status which can be inconsistent + if active_count > 0 and current_status in ['pending', 'searching', 'downloading', 'queued']: + print(f"๐Ÿ”„ [Cancel] Task {task_id} (status: {current_status}) - freeing worker slot for batch {batch_id}") + print(f"๐Ÿ”„ [Cancel] Active count before: {active_count}") + + # Use the completion callback with error handling + _on_download_completed(batch_id, task_id, success=False) + worker_slot_freed = True + + # Verify slot was actually freed + new_active = download_batches[batch_id]['active_count'] + print(f"๐Ÿ”„ [Cancel] Active count after: {new_active}") + + elif active_count == 0: + print(f"๐Ÿšซ [Cancel] Task {task_id} - no active workers to free") + else: + print(f"๐Ÿšซ [Cancel] Task {task_id} (status: {current_status}) - not actively running, no slot to free") + else: + print(f"๐Ÿšซ [Cancel] Task {task_id} - batch {batch_id} not found") + + except Exception as slot_error: + print(f"โŒ [Cancel] Error managing worker slot for {task_id}: {slot_error}") + # Attempt emergency recovery if normal completion failed + if not worker_slot_freed: + try: + print(f"๐Ÿšจ [Cancel] Attempting emergency worker slot recovery") + _recover_worker_slot(batch_id, task_id) + except Exception as recovery_error: + print(f"๐Ÿ’€ [Cancel] FATAL: Emergency recovery failed: {recovery_error}") else: print(f"๐Ÿšซ [Cancel] Task {task_id} cancelled (no batch_id - likely already completed)") diff --git a/webui/static/script.js b/webui/static/script.js index 79b18b41..3a6353d1 100644 --- a/webui/static/script.js +++ b/webui/static/script.js @@ -3422,6 +3422,8 @@ function updateTrackAnalysisResults(playlistId, results) { // ============================================================================ let globalDownloadStatusPoller = null; +let globalPollingFailureCount = 0; // Track consecutive failures for exponential backoff +let globalPollingBaseInterval = 2000; // Base polling interval in ms - MATCHES sync.py exactly function startGlobalDownloadPolling() { if (globalDownloadStatusPoller) { @@ -3475,13 +3477,121 @@ function startGlobalDownloadPolling() { processModalStatusUpdate(playlistId, statusData); }); + // ENHANCED: Reset failure count on successful polling + globalPollingFailureCount = 0; + } catch (error) { console.error('โŒ [Global Polling] Batched request failed:', error); - // Fallback: If batched request fails, don't break individual modals - // Individual error handling will be preserved in processModalStatusUpdate + // ENHANCED: Implement exponential backoff on failure + globalPollingFailureCount++; + + if (globalPollingFailureCount >= 5) { + console.error(`๐Ÿšจ [Global Polling] ${globalPollingFailureCount} consecutive failures, stopping poller`); + stopGlobalDownloadPolling(); + + // Try to restart after a delay + setTimeout(() => { + console.log('๐Ÿ”„ [Global Polling] Attempting to restart after failures'); + if (Object.keys(activeDownloadProcesses).length > 0) { + startGlobalDownloadPolling(); + } + }, 10000); // 10 second delay before restart + return; + } + + // Exponential backoff: increase interval temporarily + const backoffInterval = Math.min(globalPollingBaseInterval * Math.pow(2, globalPollingFailureCount - 1), 8000); + console.warn(`โš ๏ธ [Global Polling] Failure ${globalPollingFailureCount}/5, backing off to ${backoffInterval}ms`); + + // Temporarily adjust the polling interval + if (globalDownloadStatusPoller) { + clearInterval(globalDownloadStatusPoller); + globalDownloadStatusPoller = null; + + // Restart with backoff interval + setTimeout(() => { + if (Object.keys(activeDownloadProcesses).length > 0) { + startGlobalDownloadPollingWithInterval(backoffInterval); + } + }, backoffInterval); + } + } + }, globalPollingBaseInterval); // Use base interval initially +} + +function startGlobalDownloadPollingWithInterval(interval) { + if (globalDownloadStatusPoller) { + console.debug('๐Ÿ”„ [Global Polling] Already running, skipping start with interval'); + return; + } + + console.log(`๐Ÿ”„ [Global Polling] Starting with interval ${interval}ms`); + + // Use the exact same logic as startGlobalDownloadPolling but with custom interval + globalDownloadStatusPoller = setInterval(async () => { + const activeBatchIds = []; + const batchToPlaylistMap = {}; + + Object.entries(activeDownloadProcesses).forEach(([playlistId, process]) => { + if (process.batchId && process.status === 'running') { + activeBatchIds.push(process.batchId); + batchToPlaylistMap[process.batchId] = playlistId; + } + }); + + if (activeBatchIds.length === 0) { + console.log('๐Ÿ›‘ [Global Polling] No active processes, stopping global poller'); + stopGlobalDownloadPolling(); + return; + } + + try { + const queryParams = activeBatchIds.map(id => `batch_ids=${id}`).join('&'); + const response = await fetch(`/api/download_status/batch?${queryParams}`); + + if (!response.ok) { + throw new Error(`HTTP ${response.status}: ${response.statusText}`); + } + + const data = await response.json(); + console.debug(`๐Ÿ“Š [Global Polling] Received batched update for ${Object.keys(data.batches).length} processes`); + + Object.entries(data.batches).forEach(([batchId, statusData]) => { + const playlistId = batchToPlaylistMap[batchId]; + if (!playlistId || statusData.error) { + if (statusData.error) { + console.error(`โŒ [Global Polling] Error for batch ${batchId}:`, statusData.error); + } + return; + } + processModalStatusUpdate(playlistId, statusData); + }); + + // Success - reset to normal interval if we were backing off + globalPollingFailureCount = 0; + if (interval !== globalPollingBaseInterval) { + console.log('โœ… [Global Polling] Recovered from backoff, returning to normal interval'); + clearInterval(globalDownloadStatusPoller); + globalDownloadStatusPoller = null; + startGlobalDownloadPolling(); // Restart with normal interval + } + + } catch (error) { + console.error('โŒ [Global Polling] Request failed:', error); + globalPollingFailureCount++; + + if (globalPollingFailureCount >= 5) { + console.error(`๐Ÿšจ [Global Polling] Too many failures, stopping`); + stopGlobalDownloadPolling(); + setTimeout(() => { + if (Object.keys(activeDownloadProcesses).length > 0) { + startGlobalDownloadPolling(); + } + }, 10000); + } } - }, 1000); // 1 second polling (was 500ms individual = 2x less aggressive) + }, interval); } function stopGlobalDownloadPolling() { @@ -3506,6 +3616,18 @@ function processModalStatusUpdate(playlistId, data) { return; } + // ENHANCED: Validate response data to prevent UI corruption + if (!data || typeof data !== 'object') { + console.error(`โŒ [Status Update] Invalid data for ${playlistId}:`, data); + return; + } + + // ENHANCED: Validate task data structure + if (data.tasks && !Array.isArray(data.tasks)) { + console.error(`โŒ [Status Update] Invalid tasks data for ${playlistId} - not an array:`, data.tasks); + return; + } + console.debug(`๐Ÿ“Š [Status Update] Processing update for ${playlistId}: phase=${data.phase}, tasks=${(data.tasks || []).length}`); // Auto-show wishlist modal during active auto-processing @@ -3599,6 +3721,28 @@ function processModalStatusUpdate(playlistId, data) { } }); + // ENHANCED: Validate worker counts from server data + const serverActiveWorkers = data.active_count || 0; + const maxWorkers = data.max_concurrent || 3; + + // Count actual active workers based on task statuses + const clientActiveWorkers = (data.tasks || []).filter(task => + ['searching', 'downloading', 'queued'].includes(task.status) && + !document.querySelector(`tr[data-track-index="${task.track_index}"][data-locally-cancelled="true"]`) + ).length; + + // Log discrepancies for debugging + if (serverActiveWorkers !== clientActiveWorkers) { + console.warn(`๐Ÿ” [Worker Validation] ${playlistId}: server reports ${serverActiveWorkers} active, client sees ${clientActiveWorkers} active tasks`); + + // If server reports 0 but client sees active tasks, this might indicate ghost workers were fixed + if (serverActiveWorkers === 0 && clientActiveWorkers > 0) { + console.warn(`๐Ÿšจ [Worker Validation] Server reports 0 workers but client sees ${clientActiveWorkers} active tasks - potential UI desync`); + } + } + + console.debug(`๐Ÿ“Š [Worker Status] ${playlistId}: ${serverActiveWorkers}/${maxWorkers} active workers, ${clientActiveWorkers} client-side active tasks`); + const totalFinished = completedCount + failedOrCancelledCount; const progressPercent = missingCount > 0 ? (totalFinished / missingCount) * 100 : 0; document.getElementById(`download-progress-fill-${playlistId}`).style.width = `${progressPercent}%`;