diff --git a/web_server.py b/web_server.py index e6dce329..05a09cb0 100644 --- a/web_server.py +++ b/web_server.py @@ -120,6 +120,57 @@ download_tasks = {} # task_id -> task state dict download_batches = {} # batch_id -> {queue, active_count, max_concurrent} tasks_lock = threading.Lock() +# --- Shared Transfer Data Cache --- +# Cache transfer data to avoid hammering the Soulseek API with multiple concurrent modals +transfer_data_cache = { + 'data': {}, + 'last_update': 0, + 'update_lock': threading.Lock(), + 'cache_duration': 1.5 # Cache for 1.5 seconds to reduce API calls +} + +def get_cached_transfer_data(): + """ + Get transfer data with caching to reduce API calls when multiple modals are active. + Returns a lookup dictionary for efficient transfer matching. + """ + current_time = time.time() + + with transfer_data_cache['update_lock']: + # Check if cache is still valid + if (current_time - transfer_data_cache['last_update']) < transfer_data_cache['cache_duration']: + return transfer_data_cache['data'] + + # Cache expired or empty, fetch new data + live_transfers_lookup = {} + try: + transfers_data = asyncio.run(soulseek_client._make_request('GET', 'transfers/downloads')) + if transfers_data: + all_transfers = [] + for user_data in transfers_data: + username = user_data.get('username', 'Unknown') + if 'directories' in user_data: + for directory in user_data['directories']: + if 'files' in directory: + for file_info in directory['files']: + file_info['username'] = username + all_transfers.append(file_info) + for transfer in all_transfers: + key = f"{transfer.get('username')}::{os.path.basename(transfer.get('filename', ''))}" + live_transfers_lookup[key] = transfer + + # Update cache + transfer_data_cache['data'] = live_transfers_lookup + transfer_data_cache['last_update'] = current_time + + except Exception as e: + print(f"⚠️ Could not fetch live transfers (cached): {e}") + # Return empty dict on error, but don't update cache timestamp + # This way we'll retry on the next request + return {} + + return live_transfers_lookup + # --- Background Download Monitoring (GUI Parity) --- class WebUIDownloadMonitor: """ @@ -3353,6 +3404,93 @@ def _on_download_completed(batch_id, task_id, success=True): print(f"πŸ”„ [Batch Manager] Starting next batch for {batch_id}") _start_next_batch_of_downloads(batch_id) +def _run_full_missing_tracks_process(batch_id, playlist_id, tracks_json): + """ + A master worker that handles the entire missing tracks process: + 1. Runs the analysis. + 2. If missing tracks are found, it automatically queues them for download. + """ + try: + # PHASE 1: ANALYSIS + with tasks_lock: + if batch_id in download_batches: + download_batches[batch_id]['phase'] = 'analysis' + download_batches[batch_id]['analysis_total'] = len(tracks_json) + download_batches[batch_id]['analysis_processed'] = 0 + + from database.music_database import MusicDatabase + db = MusicDatabase() + active_server = config_manager.get_active_media_server() + analysis_results = [] + + for i, track_data in enumerate(tracks_json): + track_name = track_data.get('name', '') + artists = track_data.get('artists', []) + found, confidence = False, 0.0 + + for artist in artists: + artist_name = artist if isinstance(artist, str) else str(artist) + db_track, track_confidence = db.check_track_exists( + track_name, artist_name, confidence_threshold=0.7, server_source=active_server + ) + if db_track and track_confidence >= 0.7: + found, confidence = True, track_confidence + break + + analysis_results.append({ + 'track_index': i, 'track': track_data, 'found': found, 'confidence': confidence + }) + + with tasks_lock: + if batch_id in download_batches: + download_batches[batch_id]['analysis_processed'] = i + 1 + # Store incremental results for live updates + download_batches[batch_id]['analysis_results'] = analysis_results.copy() + + missing_tracks = [res for res in analysis_results if not res['found']] + + with tasks_lock: + if batch_id in download_batches: + download_batches[batch_id]['analysis_results'] = analysis_results + + # PHASE 2: TRANSITION TO DOWNLOAD (if necessary) + if not missing_tracks: + print(f"βœ… Analysis for batch {batch_id} complete. No missing tracks.") + with tasks_lock: + if batch_id in download_batches: + download_batches[batch_id]['phase'] = 'complete' + return + + print(f" transitioning batch {batch_id} to download phase with {len(missing_tracks)} tracks.") + + with tasks_lock: + if batch_id not in download_batches: return + + download_batches[batch_id]['phase'] = 'downloading' + + for res in missing_tracks: + task_id = str(uuid.uuid4()) + download_tasks[task_id] = { + 'status': 'pending', 'track_info': res['track'], + 'playlist_id': playlist_id, 'batch_id': batch_id, + 'track_index': res['track_index'], 'retry_count': 0, + 'cached_candidates': [], 'used_sources': set(), + 'status_change_time': time.time() + } + download_batches[batch_id]['queue'].append(task_id) + + download_monitor.start_monitoring(batch_id) + _start_next_batch_of_downloads(batch_id) + + except Exception as e: + print(f"❌ Master worker for batch {batch_id} failed: {e}") + import traceback + traceback.print_exc() + with tasks_lock: + if batch_id in download_batches: + download_batches[batch_id]['phase'] = 'error' + download_batches[batch_id]['error'] = str(e) + def _download_track_worker(task_id, batch_id=None): """ Enhanced download worker that matches the GUI's exact retry logic. @@ -3728,94 +3866,88 @@ def start_playlist_missing_downloads(playlist_id): print(f"❌ Error starting missing downloads: {e}") return jsonify({"success": False, "error": str(e)}), 500 +@app.route('/api/active-processes', methods=['GET']) +def get_active_processes(): + """ + This endpoint now only needs to check for active download batches, + as the analysis phase is now part of the batch process. + """ + active_processes = [] + with tasks_lock: + for batch_id, batch_data in download_batches.items(): + if batch_data.get('phase') not in ['complete', 'error', 'cancelled']: + active_processes.append({ + "type": "batch", + "playlist_id": batch_data.get('playlist_id'), + "playlist_name": batch_data.get('playlist_name'), + "batch_id": batch_id, + "phase": batch_data.get('phase') + }) + return jsonify({"active_processes": active_processes}) + @app.route('/api/playlists//download_status', methods=['GET']) def get_batch_download_status(batch_id): """ - Returns real-time status for all tasks in a batch. This version correctly - parses the live 'state' from slskd to report 'completed' status, fixing - the UI issue where downloads would get stuck at 100%. + Returns real-time status for a batch, now including the + current phase (analysis, downloading, etc.) and analysis progress. """ try: - # --- Fetch live transfer data from slskd --- - live_transfers_lookup = {} - try: - transfers_data = asyncio.run(soulseek_client._make_request('GET', 'transfers/downloads')) - if transfers_data: - all_transfers = [] - for user_data in transfers_data: - username = user_data.get('username', 'Unknown') - if 'directories' in user_data: - for directory in user_data['directories']: - if 'files' in directory: - for file_info in directory['files']: - file_info['username'] = username - all_transfers.append(file_info) - - # Create a lookup dictionary using a reliable composite key - for transfer in all_transfers: - key = f"{transfer.get('username')}::{os.path.basename(transfer.get('filename', ''))}" - live_transfers_lookup[key] = transfer - except Exception as e: - print(f"⚠️ Could not fetch live transfers for modal status: {e}") + # Use cached transfer data to reduce API calls with multiple concurrent modals + live_transfers_lookup = get_cached_transfer_data() - # --- Process tasks and enrich with live data --- with tasks_lock: - batch_tasks = [] if batch_id not in download_batches: - return jsonify({"tasks": []}) + return jsonify({"error": "Batch not found"}), 404 - for task_id in download_batches[batch_id].get('queue', []): - task = download_tasks.get(task_id) - if not task: - continue + batch = download_batches[batch_id] + response_data = { + "phase": batch.get('phase', 'unknown'), + "error": batch.get('error') + } - task_status = { - 'task_id': task_id, - 'track_index': task['track_index'], - 'status': task['status'], - 'track_info': task['track_info'], - 'progress': 0 + if response_data["phase"] == 'analysis': + response_data['analysis_progress'] = { + 'total': batch.get('analysis_total', 0), + 'processed': batch.get('analysis_processed', 0) } + response_data['analysis_results'] = batch.get('analysis_results', []) - # --- USE THE RELIABLE KEY FOR MATCHING --- - task_filename = task.get('filename') or task['track_info'].get('filename') - task_username = task.get('username') or task['track_info'].get('username') - - if task_filename and task_username: - lookup_key = f"{task_username}::{os.path.basename(task_filename)}" - - if lookup_key in live_transfers_lookup: - live_info = live_transfers_lookup[lookup_key] - - # --- THIS IS THE KEY FIX --- - # Correctly parse the live state string from the API - state_str = live_info.get('state', 'Unknown') - - if 'Completed' in state_str or 'Succeeded' in state_str: - task_status['status'] = 'completed' - elif 'Cancelled' in state_str or 'Canceled' in state_str: - task_status['status'] = 'cancelled' - elif 'Failed' in state_str or 'Errored' in state_str: - task_status['status'] = 'failed' - elif 'InProgress' in state_str: - task_status['status'] = 'downloading' - else: - task_status['status'] = 'queued' - # --- END OF FIX --- - - task_status['progress'] = live_info.get('percentComplete', 0) - print(f"πŸ”§ [Status API] Live Update for Task {task_id}: Status '{task_status['status']}', Progress {task_status['progress']}%") - - batch_tasks.append(task_status) + elif response_data["phase"] in ['downloading', 'complete', 'error']: + response_data['analysis_results'] = batch.get('analysis_results', []) + batch_tasks = [] + for task_id in batch.get('queue', []): + task = download_tasks.get(task_id) + if not task: continue + + task_status = { + 'task_id': task_id, + 'track_index': task['track_index'], + 'status': task['status'], + 'track_info': task['track_info'], + 'progress': 0 + } + task_filename = task.get('filename') or task['track_info'].get('filename') + task_username = task.get('username') or task['track_info'].get('username') + if task_filename and task_username: + lookup_key = f"{task_username}::{os.path.basename(task_filename)}" + if lookup_key in live_transfers_lookup: + live_info = live_transfers_lookup[lookup_key] + state_str = live_info.get('state', 'Unknown') + if 'Completed' in state_str or 'Succeeded' in state_str: task_status['status'] = 'completed' + elif 'Cancelled' in state_str or 'Canceled' in state_str: task_status['status'] = 'cancelled' + elif 'Failed' in state_str or 'Errored' in state_str: task_status['status'] = 'failed' + elif 'InProgress' in state_str: task_status['status'] = 'downloading' + else: task_status['status'] = 'queued' + task_status['progress'] = live_info.get('percentComplete', 0) + batch_tasks.append(task_status) + batch_tasks.sort(key=lambda x: x['track_index']) + response_data['tasks'] = batch_tasks + + return jsonify(response_data) - batch_tasks.sort(key=lambda x: x['track_index']) - - return jsonify({"tasks": batch_tasks}) - except Exception as e: import traceback traceback.print_exc() - print(f"❌ Error getting batch status: {e}") return jsonify({"error": str(e)}), 500 @app.route('/api/downloads/cancel_task', methods=['POST']) @@ -3917,159 +4049,86 @@ def cancel_download_task(): except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 -# =============================== -# == TRACK ANALYSIS API == -# =============================== - -# Global state for track analysis tasks -analysis_executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="AnalysisWorker") -analysis_tasks = {} # task_id -> analysis state -analysis_lock = threading.Lock() - -def _run_track_analysis_task(task_id, tracks_json): - """Run track analysis in background thread (same logic as GUI's PlaylistTrackAnalysisWorker)""" - import uuid - from database.music_database import MusicDatabase - from config.settings import config_manager - - print(f"πŸ” Starting track analysis task {task_id} for {len(tracks_json)} tracks") - +@app.route('/api/playlists//cancel_batch', methods=['POST']) +def cancel_batch(batch_id): + """ + Cancels an entire batch - useful for cancelling during analysis phase + or cancelling all downloads at once. + """ try: - # Initialize database connection - db = MusicDatabase() - active_server = config_manager.get_active_media_server() - - results = [] - total_tracks = len(tracks_json) - - for i, track_data in enumerate(tracks_json): - with analysis_lock: - # Check if task was cancelled - if analysis_tasks.get(task_id, {}).get('status') == 'cancelled': - print(f"❌ Analysis task {task_id} was cancelled") - return - - track_name = track_data.get('name', '') - artists = track_data.get('artists', []) + with tasks_lock: + if batch_id not in download_batches: + return jsonify({"success": False, "error": "Batch not found"}), 404 - # Try each artist for matching (same as GUI logic) - found = False - confidence = 0.0 + # Mark batch as cancelled + download_batches[batch_id]['phase'] = 'cancelled' - for artist in artists: - artist_name = artist if isinstance(artist, str) else str(artist) - - # Check database for track existence - db_track, track_confidence = db.check_track_exists( - track_name, artist_name, - confidence_threshold=0.7, - server_source=active_server - ) - - if db_track and track_confidence >= 0.7: - found = True - confidence = track_confidence - print(f"βœ… Found: '{track_name}' by {artist_name} (confidence: {confidence:.2f})") - break + # Cancel all individual tasks in the batch + cancelled_count = 0 + for task_id in download_batches[batch_id].get('queue', []): + if task_id in download_tasks: + task = download_tasks[task_id] + if task['status'] not in ['completed', 'cancelled']: + task['status'] = 'cancelled' + cancelled_count += 1 - if not found: - print(f"❌ Missing: '{track_name}' by {artists}") + print(f"βœ… Cancelled batch {batch_id} with {cancelled_count} tasks") + return jsonify({"success": True, "cancelled_tasks": cancelled_count}) - # Store result - result = { - 'track_index': i, - 'track': track_data, - 'found': found, - 'confidence': confidence - } - results.append(result) - - # Update progress - progress = int((i + 1) / total_tracks * 100) - with analysis_lock: - if task_id in analysis_tasks: - analysis_tasks[task_id].update({ - 'progress': progress, - 'processed': i + 1, - 'results': results.copy() # Store current results - }) - - # Mark as complete - with analysis_lock: - if task_id in analysis_tasks: - analysis_tasks[task_id].update({ - 'status': 'complete', - 'progress': 100, - 'results': results, - 'total_found': len([r for r in results if r['found']]), - 'total_missing': len([r for r in results if not r['found']]) - }) - - print(f"βœ… Analysis complete: {len([r for r in results if r['found']])} found, {len([r for r in results if not r['found']])} missing") - except Exception as e: - print(f"❌ Analysis task {task_id} failed: {e}") - with analysis_lock: - if task_id in analysis_tasks: - analysis_tasks[task_id].update({ - 'status': 'error', - 'error': str(e) - }) + print(f"❌ Error cancelling batch {batch_id}: {e}") + return jsonify({"success": False, "error": str(e)}), 500 + +# =============================== +# == UNIFIED MISSING TRACKS API == +# =============================== -@app.route('/api/tracks/analyze', methods=['POST']) -def start_track_analysis(): - """Start track analysis to check which tracks exist in media server library""" +@app.route('/api/playlists//start-missing-process', methods=['POST']) +def start_missing_tracks_process(playlist_id): + """ + A single, robust endpoint to kick off the entire missing tracks workflow. + It creates a batch and starts the master worker in the background. + """ data = request.get_json() tracks = data.get('tracks', []) - + playlist_name = data.get('playlist_name', 'Unknown Playlist') + if not tracks: return jsonify({"success": False, "error": "No tracks provided"}), 400 - - # Generate unique task ID - import uuid - task_id = str(uuid.uuid4()) - - # Initialize task state - with analysis_lock: - analysis_tasks[task_id] = { - 'status': 'running', - 'progress': 0, - 'total': len(tracks), - 'processed': 0, - 'results': [], - 'total_found': 0, - 'total_missing': 0 + + # Limit concurrent analysis processes to prevent resource exhaustion + with tasks_lock: + active_analysis_count = sum(1 for batch in download_batches.values() + if batch.get('phase') == 'analysis') + if active_analysis_count >= 3: # Allow max 3 concurrent analysis processes + return jsonify({ + "success": False, + "error": "Too many analysis processes running. Please wait for one to complete." + }), 429 + + batch_id = str(uuid.uuid4()) + + with tasks_lock: + download_batches[batch_id] = { + 'phase': 'analysis', + 'playlist_id': playlist_id, + 'playlist_name': playlist_name, + 'queue': [], + 'active_count': 0, + 'max_concurrent': 3, + 'queue_index': 0, + 'analysis_total': len(tracks), + 'analysis_processed': 0, + 'analysis_results': [] } - - # Submit analysis task - future = analysis_executor.submit(_run_track_analysis_task, task_id, tracks) - + + missing_download_executor.submit(_run_full_missing_tracks_process, batch_id, playlist_id, tracks) + return jsonify({ "success": True, - "task_id": task_id, - "total_tracks": len(tracks) + "batch_id": batch_id }) -@app.route('/api/tracks/analyze/status/', methods=['GET']) -def get_analysis_status(task_id): - """Get status of track analysis task""" - with analysis_lock: - task = analysis_tasks.get(task_id) - if not task: - return jsonify({"error": "Task not found"}), 404 - - return jsonify(task) - -@app.route('/api/tracks/analyze/cancel/', methods=['POST']) -def cancel_analysis_task(task_id): - """Cancel a running analysis task""" - with analysis_lock: - if task_id in analysis_tasks: - analysis_tasks[task_id]['status'] = 'cancelled' - return jsonify({"success": True, "message": "Task cancelled"}) - else: - return jsonify({"success": False, "error": "Task not found"}), 404 - @app.route('/api/tracks/download_missing', methods=['POST']) def start_missing_downloads(): """Legacy endpoint - redirect to new playlist-based endpoint""" diff --git a/webui/static/script.js b/webui/static/script.js index 7fa75a4b..1e6df5c5 100644 --- a/webui/static/script.js +++ b/webui/static/script.js @@ -1398,6 +1398,53 @@ async function loadSyncData() { } } +async function checkForActiveProcesses() { + try { + const response = await fetch('/api/active-processes'); + if (!response.ok) return; + + const data = await response.json(); + const processes = data.active_processes || []; + + if (processes.length > 0) { + console.log(`πŸ”„ Found ${processes.length} active process(es) from backend. Rehydrating UI...`); + for (const processInfo of processes) { + if (!activeDownloadProcesses[processInfo.playlist_id]) { + rehydrateModal(processInfo); + } + } + } + } catch (error) { + console.error('Failed to check for active processes:', error); + } +} + +async function rehydrateModal(processInfo) { + const { playlist_id, playlist_name, batch_id } = processInfo; + console.log(`πŸ’§ Rehydrating modal for playlist "${playlist_name}" (batch: ${batch_id})`); + + let playlistData = spotifyPlaylists.find(p => p.id === playlist_id); + if (!playlistData) { + console.warn(`Cannot rehydrate modal: Playlist data for ${playlist_id} not loaded.`); + return; + } + await openDownloadMissingModal(playlist_id); + const process = activeDownloadProcesses[playlist_id]; + if (!process) return; + + process.status = 'running'; + process.batchId = batch_id; + updatePlaylistCardUI(playlist_id); + updateRefreshButtonState(); + + document.getElementById(`begin-analysis-btn-${playlist_id}`).style.display = 'none'; + document.getElementById(`cancel-all-btn-${playlist_id}`).style.display = 'inline-block'; + + startModalDownloadPolling(playlist_id); + + process.modalElement.style.display = 'none'; +} + async function loadSpotifyPlaylists() { const container = document.getElementById('spotify-playlist-container'); const refreshBtn = document.getElementById('spotify-refresh-btn'); @@ -1415,6 +1462,9 @@ async function loadSpotifyPlaylists() { spotifyPlaylists = await response.json(); renderSpotifyPlaylists(); spotifyPlaylistsLoaded = true; + + await checkForActiveProcesses(); + } catch (error) { container.innerHTML = `
❌ Error: ${error.message}
`; showToast(`Error loading playlists: ${error.message}`, 'error'); @@ -1675,6 +1725,7 @@ async function openDownloadMissingModal(playlistId) { // **NEW**: Check if a process is already active for this playlist if (activeDownloadProcesses[playlistId]) { console.log(`Modal for ${playlistId} already exists. Showing it.`); + closePlaylistDetailsModal(); // Close playlist details modal even when reusing existing modal const process = activeDownloadProcesses[playlistId]; if (process.modalElement) { process.modalElement.style.display = 'flex'; @@ -1809,7 +1860,7 @@ async function openDownloadMissingModal(playlistId) {