From 2ede3ddee71195ea8187d2801c63303b70c81997 Mon Sep 17 00:00:00 2001 From: Broque Thomas Date: Tue, 13 Jan 2026 09:42:07 -0800 Subject: [PATCH] Improve wishlist and watchlist auto-scheduling robustness Refactored wishlist and watchlist auto-processing scheduling to use atomic timer updates and retry logic, preventing stuck timers and improving reliability. Removed excessive debug logging and improved error handling throughout related functions. Added checks to prevent concurrent wishlist processing and ensured timer state is managed consistently. --- web_server.py | 207 +++++++++++++++++++++++++++++++------------------- 1 file changed, 130 insertions(+), 77 deletions(-) diff --git a/web_server.py b/web_server.py index 53164fb5..f709c845 100644 --- a/web_server.py +++ b/web_server.py @@ -3845,10 +3845,8 @@ def _find_completed_file_robust(download_dir, api_filename, transfer_dir=None): # YOUTUBE SUPPORT: Handle encoded filename format "video_id||title" # Extract just the title part for file matching if '||' in api_filename: - print(f"🎵 Detected YouTube encoded filename: {api_filename}") _, title = api_filename.split('||', 1) api_filename = title # Use just the title for file searching - print(f"🎵 Extracted title for search: {api_filename}") def normalize_for_finding(text: str) -> str: """A powerful normalization function adapted from matching_engine.py.""" @@ -3864,8 +3862,6 @@ def _find_completed_file_robust(download_dir, api_filename, transfer_dir=None): def search_in_directory(search_dir, location_name): """Search for the file in a specific directory.""" - print(f" searching for normalized filename '{normalized_target}' in '{search_dir}'...") - best_match_path = None highest_similarity = 0.0 @@ -3875,7 +3871,7 @@ def _find_completed_file_robust(download_dir, api_filename, transfer_dir=None): # Direct match is the best case if os.path.basename(file) == target_basename: file_path = os.path.join(root, file) - print(f"Found exact match: {file_path}") + print(f"✅ Found exact match in {location_name}: {file_path}") return file_path, 1.0 # Fuzzy matching for variations @@ -3898,27 +3894,22 @@ def _find_completed_file_robust(download_dir, api_filename, transfer_dir=None): # Use a high confidence threshold for fuzzy matches to prevent false positives if downloads_similarity > 0.85: location = 'downloads' - if downloads_similarity == 1.0: - print(f"✅ Found exact match in downloads: {best_downloads_path}") - else: - print(f"✅ Found best fuzzy match in downloads with similarity {downloads_similarity:.2f}: {best_downloads_path}") + if downloads_similarity < 1.0: + print(f"✅ Found fuzzy match in downloads ({downloads_similarity:.2f}): {best_downloads_path}") return (best_downloads_path, location) - + # If not found in downloads and transfer_dir is provided, search there transfer_similarity = 0.0 # Initialize transfer_similarity if transfer_dir and os.path.exists(transfer_dir): - print(f"🔍 File not found in downloads, checking transfer folder...") best_transfer_path, transfer_similarity = search_in_directory(transfer_dir, 'transfer') if transfer_similarity > 0.85: location = 'transfer' - if transfer_similarity == 1.0: - print(f"✅ Found exact match in transfer: {best_transfer_path}") - else: - print(f"✅ Found best fuzzy match in transfer with similarity {transfer_similarity:.2f}: {best_transfer_path}") + if transfer_similarity < 1.0: + print(f"✅ Found fuzzy match in transfer ({transfer_similarity:.2f}): {best_transfer_path}") return (best_transfer_path, location) - - print(f"❌ Could not find a confident match for '{target_basename}' in any location. Best similarity was {max(downloads_similarity, transfer_similarity):.2f}.") + + # Don't spam logs - file not found is common for completed/processed downloads return (None, None) @@ -4046,16 +4037,11 @@ def get_download_status(): processing_thread.start() # Also include YouTube downloads in the response - print(f"🔍 [Status] Starting YouTube downloads check...") try: all_youtube_downloads = asyncio.run(soulseek_client.get_all_downloads()) - print(f"🔍 [Debug] Fetched {len(all_youtube_downloads)} total downloads from orchestrator") - youtube_count = sum(1 for d in all_youtube_downloads if d.username == 'youtube') - print(f"🔍 [Debug] Found {youtube_count} YouTube downloads") for download in all_youtube_downloads: if download.username == 'youtube': - print(f"🔍 [Debug] Processing YouTube download: {download.id}, state: {download.state}") # Convert DownloadStatus to transfer format that frontend expects youtube_transfer = { 'id': download.id, @@ -4073,14 +4059,9 @@ def get_download_status(): # Check if YouTube download is completed and needs post-processing if download.state and ('succeeded' in download.state.lower() or 'completed' in download.state.lower()): context_key = f"{download.username}::{extract_filename(download.filename)}" - print(f"🔍 [YouTube] Checking completed download - Context key: {context_key}") - print(f"🔍 [YouTube] Download state: {download.state}") with matched_context_lock: context = matched_downloads_context.get(context_key) - print(f"🔍 [YouTube] Context found: {context is not None}") - if not context: - print(f"🔍 [YouTube] Available context keys: {list(matched_downloads_context.keys())}") if context and context_key not in _processed_download_ids: download_dir = docker_resolve_path(config_manager.get('soulseek.download_path', './downloads')) @@ -4104,6 +4085,10 @@ def get_download_status(): processing_thread = threading.Thread(target=process_youtube_download) processing_thread.daemon = True processing_thread.start() + else: + # File not found - likely already processed and moved to library + # Mark as processed to prevent infinite checking + _processed_download_ids.add(context_key) except Exception as yt_error: import traceback print(f"⚠️ Could not fetch YouTube downloads for status: {yt_error}") @@ -8820,11 +8805,9 @@ def _sanitize_track_data_for_processing(track_data): raw_album = sanitized.get('album', '') if isinstance(raw_album, dict) and 'name' in raw_album: sanitized['album'] = raw_album['name'] - print(f"🔧 [Sanitize] Converted album from dict to string: '{raw_album['name']}'") elif not isinstance(raw_album, str): sanitized['album'] = str(raw_album) - print(f"🔧 [Sanitize] Converted album to string: '{sanitized['album']}'") - + # Handle artists field - ensure it's a list of strings raw_artists = sanitized.get('artists', []) if isinstance(raw_artists, list): @@ -8834,10 +8817,8 @@ def _sanitize_track_data_for_processing(track_data): processed_artists.append(artist) elif isinstance(artist, dict) and 'name' in artist: processed_artists.append(artist['name']) - print(f"🔧 [Sanitize] Converted artist from dict to string: '{artist['name']}'") else: processed_artists.append(str(artist)) - print(f"🔧 [Sanitize] Converted artist to string: '{str(artist)}'") sanitized['artists'] = processed_artists else: print(f"⚠️ [Sanitize] Unexpected artists format: {type(raw_artists)}") @@ -8970,16 +8951,55 @@ def stop_wishlist_auto_processing(): wishlist_auto_processing_timestamp = 0 wishlist_next_run_time = 0 # Clear countdown timer -def schedule_next_wishlist_processing(): - """Schedule next automatic wishlist processing in 30 minutes.""" +def schedule_next_wishlist_processing(retry_count=0, max_retries=3): + """ + Schedule next automatic wishlist processing in 30 minutes. + Includes retry logic and atomic timer updates to prevent "0s" stuck state. + + Args: + retry_count: Current retry attempt (internal use) + max_retries: Maximum number of retry attempts + """ global wishlist_auto_timer, wishlist_next_run_time - with wishlist_timer_lock: - print("⏰ Scheduling next automatic wishlist processing in 30 minutes") - wishlist_next_run_time = time.time() + 1800.0 # Set timestamp for countdown display - wishlist_auto_timer = threading.Timer(1800.0, _process_wishlist_automatically) # 30 minutes (1800 seconds) - wishlist_auto_timer.daemon = True - wishlist_auto_timer.start() + try: + with wishlist_timer_lock: + # Cancel existing timer if present (prevent orphaned timers) + if wishlist_auto_timer is not None: + try: + wishlist_auto_timer.cancel() + except Exception as cancel_error: + print(f"⚠️ Failed to cancel old wishlist timer: {cancel_error}") + + # Calculate next run time BEFORE creating timer + next_time = time.time() + 1800.0 # 30 minutes + + # Create and start new timer + new_timer = threading.Timer(1800.0, _process_wishlist_automatically) + new_timer.daemon = True + new_timer.start() + + # Only update globals AFTER successful timer creation and start + wishlist_next_run_time = next_time + wishlist_auto_timer = new_timer + + print(f"⏰ Scheduled next wishlist processing in 30 minutes") + + except Exception as e: + print(f"❌ [CRITICAL] Failed to schedule wishlist processing (attempt {retry_count + 1}/{max_retries}): {e}") + import traceback + traceback.print_exc() + + # Retry with exponential backoff + if retry_count < max_retries: + retry_delay = 5 * (2 ** retry_count) # 5s, 10s, 20s + print(f"🔄 Retrying wishlist scheduling in {retry_delay} seconds...") + retry_timer = threading.Timer(retry_delay, lambda: schedule_next_wishlist_processing(retry_count + 1, max_retries)) + retry_timer.daemon = True + retry_timer.start() + else: + print(f"❌ [FATAL] Failed to schedule wishlist processing after {max_retries} attempts!") + print("⚠️ MANUAL INTERVENTION REQUIRED - Wishlist auto-processing will not run!") def _process_wishlist_automatically(): """Main automatic processing logic that runs in background thread.""" @@ -9035,19 +9055,21 @@ def _process_wishlist_automatically(): with wishlist_timer_lock: wishlist_auto_processing = False wishlist_auto_processing_timestamp = 0 - wishlist_next_run_time = 0 # Clear old timer before rescheduling - schedule_next_wishlist_processing() + # Don't clear wishlist_next_run_time - let schedule function handle atomically + schedule_next_wishlist_processing() # Has built-in error handling and retry return print(f"🎵 [Auto-Wishlist] Found {count} tracks in wishlist, starting automatic processing...") - # Check if wishlist processing is already active + # Check if wishlist processing is already active (auto or manual) playlist_id = "wishlist" with tasks_lock: for batch_id, batch_data in download_batches.items(): - if (batch_data.get('playlist_id') == playlist_id and + batch_playlist_id = batch_data.get('playlist_id') + # Check for both auto ('wishlist') and manual ('wishlist_manual') batches + if (batch_playlist_id in ['wishlist', 'wishlist_manual'] and batch_data.get('phase') not in ['complete', 'error', 'cancelled']): - print("⚠️ Wishlist processing already active in another batch, skipping automatic start") + print(f"⚠️ Wishlist processing already active in another batch ({batch_playlist_id}), skipping automatic start") with wishlist_timer_lock: wishlist_auto_processing = False schedule_next_wishlist_processing() @@ -9281,15 +9303,10 @@ def _process_wishlist_automatically(): with wishlist_timer_lock: wishlist_auto_processing = False wishlist_auto_processing_timestamp = 0 - wishlist_next_run_time = 0 # Clear old timer before rescheduling + # Don't clear wishlist_next_run_time - let schedule function handle atomically - # CRITICAL: Wrap rescheduling in try/except to prevent timer thread death - try: - schedule_next_wishlist_processing() - except Exception as schedule_error: - print(f"❌ [CRITICAL] Failed to schedule next wishlist processing: {schedule_error}") - import traceback - traceback.print_exc() + # Reschedule next cycle (has built-in error handling and retry logic) + schedule_next_wishlist_processing() # =============================== # == DATABASE UPDATER API == @@ -9731,6 +9748,13 @@ def start_wishlist_missing_downloads(): identical to playlist processing, maintaining exactly 3 concurrent downloads. """ try: + # Check if auto-processing is currently running (prevent concurrent wishlist access) + if is_wishlist_actually_processing(): + return jsonify({ + "error": "Wishlist auto-processing is currently running. Please wait for it to complete.", + "retry_after": 30 + }), 409 + data = request.get_json() or {} force_download_all = data.get('force_download_all', False) category = data.get('category') # Get category filter (albums or singles) @@ -11378,16 +11402,11 @@ def _process_failed_tracks_to_wishlist_exact_with_auto_completion(batch_id): with wishlist_timer_lock: wishlist_auto_processing = False wishlist_auto_processing_timestamp = 0 - wishlist_next_run_time = 0 # Clear old timer before rescheduling + # Don't clear wishlist_next_run_time here - let schedule function handle it atomically - # Schedule next automatic processing cycle + # Schedule next automatic processing cycle (handles timer atomically with retry logic) print("⏰ [Auto-Wishlist] Scheduling next automatic cycle in 30 minutes") - try: - schedule_next_wishlist_processing() - except Exception as schedule_error: - print(f"❌ [CRITICAL] Failed to schedule next wishlist processing: {schedule_error}") - import traceback - traceback.print_exc() + schedule_next_wishlist_processing() # Has built-in error handling and retry return completion_summary @@ -11400,16 +11419,11 @@ def _process_failed_tracks_to_wishlist_exact_with_auto_completion(batch_id): with wishlist_timer_lock: wishlist_auto_processing = False wishlist_auto_processing_timestamp = 0 - wishlist_next_run_time = 0 # Clear old timer before rescheduling + # Don't clear wishlist_next_run_time here - let schedule function handle it atomically - # Schedule next cycle even after error to maintain continuity + # Schedule next cycle even after error to maintain continuity (has built-in retry logic) print("⏰ [Auto-Wishlist] Scheduling next cycle after error (30 minutes)") - try: - schedule_next_wishlist_processing() - except Exception as schedule_error: - print(f"❌ [CRITICAL] Failed to schedule next wishlist processing: {schedule_error}") - import traceback - traceback.print_exc() + schedule_next_wishlist_processing() # Has built-in error handling and retry return {'tracks_added': 0, 'errors': 1, 'total_failed': 0} @@ -17665,16 +17679,55 @@ def stop_watchlist_auto_scanning(): watchlist_auto_scanning = False watchlist_auto_scanning_timestamp = 0 -def schedule_next_watchlist_scan(): - """Schedule next automatic watchlist scan in 24 hours.""" +def schedule_next_watchlist_scan(retry_count=0, max_retries=3): + """ + Schedule next automatic watchlist scan in 24 hours. + Includes retry logic and atomic timer updates to prevent "0s" stuck state. + + Args: + retry_count: Current retry attempt (internal use) + max_retries: Maximum number of retry attempts + """ global watchlist_auto_timer, watchlist_next_run_time - with watchlist_timer_lock: - print("⏰ Scheduling next automatic watchlist scan in 24 hours") - watchlist_next_run_time = time.time() + 86400.0 # Set timestamp for countdown display - watchlist_auto_timer = threading.Timer(86400.0, _process_watchlist_scan_automatically) # 24 hours - watchlist_auto_timer.daemon = True - watchlist_auto_timer.start() + try: + with watchlist_timer_lock: + # Cancel existing timer if present (prevent orphaned timers) + if watchlist_auto_timer is not None: + try: + watchlist_auto_timer.cancel() + except Exception as cancel_error: + print(f"⚠️ Failed to cancel old watchlist timer: {cancel_error}") + + # Calculate next run time BEFORE creating timer + next_time = time.time() + 86400.0 # 24 hours + + # Create and start new timer + new_timer = threading.Timer(86400.0, _process_watchlist_scan_automatically) + new_timer.daemon = True + new_timer.start() + + # Only update globals AFTER successful timer creation and start + watchlist_next_run_time = next_time + watchlist_auto_timer = new_timer + + print(f"⏰ Scheduled next watchlist scan in 24 hours") + + except Exception as e: + print(f"❌ [CRITICAL] Failed to schedule watchlist scan (attempt {retry_count + 1}/{max_retries}): {e}") + import traceback + traceback.print_exc() + + # Retry with exponential backoff + if retry_count < max_retries: + retry_delay = 5 * (2 ** retry_count) # 5s, 10s, 20s + print(f"🔄 Retrying watchlist scheduling in {retry_delay} seconds...") + retry_timer = threading.Timer(retry_delay, lambda: schedule_next_watchlist_scan(retry_count + 1, max_retries)) + retry_timer.daemon = True + retry_timer.start() + else: + print(f"❌ [FATAL] Failed to schedule watchlist scan after {max_retries} attempts!") + print("⚠️ MANUAL INTERVENTION REQUIRED - Watchlist auto-scanning will not run!") def _process_watchlist_scan_automatically(): """Main automatic scanning logic that runs in background thread."""