Improve wishlist and watchlist auto-scheduling robustness

Refactored wishlist and watchlist auto-processing scheduling to use atomic timer updates and retry logic, preventing stuck timers and improving reliability. Removed excessive debug logging and improved error handling throughout related functions. Added checks to prevent concurrent wishlist processing and ensured timer state is managed consistently.
pull/115/head
Broque Thomas 4 months ago
parent 007640a37f
commit 2ede3ddee7

@ -3845,10 +3845,8 @@ def _find_completed_file_robust(download_dir, api_filename, transfer_dir=None):
# YOUTUBE SUPPORT: Handle encoded filename format "video_id||title"
# Extract just the title part for file matching
if '||' in api_filename:
print(f"🎵 Detected YouTube encoded filename: {api_filename}")
_, title = api_filename.split('||', 1)
api_filename = title # Use just the title for file searching
print(f"🎵 Extracted title for search: {api_filename}")
def normalize_for_finding(text: str) -> str:
"""A powerful normalization function adapted from matching_engine.py."""
@ -3864,8 +3862,6 @@ def _find_completed_file_robust(download_dir, api_filename, transfer_dir=None):
def search_in_directory(search_dir, location_name):
"""Search for the file in a specific directory."""
print(f" searching for normalized filename '{normalized_target}' in '{search_dir}'...")
best_match_path = None
highest_similarity = 0.0
@ -3875,7 +3871,7 @@ def _find_completed_file_robust(download_dir, api_filename, transfer_dir=None):
# Direct match is the best case
if os.path.basename(file) == target_basename:
file_path = os.path.join(root, file)
print(f"Found exact match: {file_path}")
print(f"Found exact match in {location_name}: {file_path}")
return file_path, 1.0
# Fuzzy matching for variations
@ -3898,27 +3894,22 @@ def _find_completed_file_robust(download_dir, api_filename, transfer_dir=None):
# Use a high confidence threshold for fuzzy matches to prevent false positives
if downloads_similarity > 0.85:
location = 'downloads'
if downloads_similarity == 1.0:
print(f"✅ Found exact match in downloads: {best_downloads_path}")
else:
print(f"✅ Found best fuzzy match in downloads with similarity {downloads_similarity:.2f}: {best_downloads_path}")
if downloads_similarity < 1.0:
print(f"✅ Found fuzzy match in downloads ({downloads_similarity:.2f}): {best_downloads_path}")
return (best_downloads_path, location)
# If not found in downloads and transfer_dir is provided, search there
transfer_similarity = 0.0 # Initialize transfer_similarity
if transfer_dir and os.path.exists(transfer_dir):
print(f"🔍 File not found in downloads, checking transfer folder...")
best_transfer_path, transfer_similarity = search_in_directory(transfer_dir, 'transfer')
if transfer_similarity > 0.85:
location = 'transfer'
if transfer_similarity == 1.0:
print(f"✅ Found exact match in transfer: {best_transfer_path}")
else:
print(f"✅ Found best fuzzy match in transfer with similarity {transfer_similarity:.2f}: {best_transfer_path}")
if transfer_similarity < 1.0:
print(f"✅ Found fuzzy match in transfer ({transfer_similarity:.2f}): {best_transfer_path}")
return (best_transfer_path, location)
print(f"❌ Could not find a confident match for '{target_basename}' in any location. Best similarity was {max(downloads_similarity, transfer_similarity):.2f}.")
# Don't spam logs - file not found is common for completed/processed downloads
return (None, None)
@ -4046,16 +4037,11 @@ def get_download_status():
processing_thread.start()
# Also include YouTube downloads in the response
print(f"🔍 [Status] Starting YouTube downloads check...")
try:
all_youtube_downloads = asyncio.run(soulseek_client.get_all_downloads())
print(f"🔍 [Debug] Fetched {len(all_youtube_downloads)} total downloads from orchestrator")
youtube_count = sum(1 for d in all_youtube_downloads if d.username == 'youtube')
print(f"🔍 [Debug] Found {youtube_count} YouTube downloads")
for download in all_youtube_downloads:
if download.username == 'youtube':
print(f"🔍 [Debug] Processing YouTube download: {download.id}, state: {download.state}")
# Convert DownloadStatus to transfer format that frontend expects
youtube_transfer = {
'id': download.id,
@ -4073,14 +4059,9 @@ def get_download_status():
# Check if YouTube download is completed and needs post-processing
if download.state and ('succeeded' in download.state.lower() or 'completed' in download.state.lower()):
context_key = f"{download.username}::{extract_filename(download.filename)}"
print(f"🔍 [YouTube] Checking completed download - Context key: {context_key}")
print(f"🔍 [YouTube] Download state: {download.state}")
with matched_context_lock:
context = matched_downloads_context.get(context_key)
print(f"🔍 [YouTube] Context found: {context is not None}")
if not context:
print(f"🔍 [YouTube] Available context keys: {list(matched_downloads_context.keys())}")
if context and context_key not in _processed_download_ids:
download_dir = docker_resolve_path(config_manager.get('soulseek.download_path', './downloads'))
@ -4104,6 +4085,10 @@ def get_download_status():
processing_thread = threading.Thread(target=process_youtube_download)
processing_thread.daemon = True
processing_thread.start()
else:
# File not found - likely already processed and moved to library
# Mark as processed to prevent infinite checking
_processed_download_ids.add(context_key)
except Exception as yt_error:
import traceback
print(f"⚠️ Could not fetch YouTube downloads for status: {yt_error}")
@ -8820,11 +8805,9 @@ def _sanitize_track_data_for_processing(track_data):
raw_album = sanitized.get('album', '')
if isinstance(raw_album, dict) and 'name' in raw_album:
sanitized['album'] = raw_album['name']
print(f"🔧 [Sanitize] Converted album from dict to string: '{raw_album['name']}'")
elif not isinstance(raw_album, str):
sanitized['album'] = str(raw_album)
print(f"🔧 [Sanitize] Converted album to string: '{sanitized['album']}'")
# Handle artists field - ensure it's a list of strings
raw_artists = sanitized.get('artists', [])
if isinstance(raw_artists, list):
@ -8834,10 +8817,8 @@ def _sanitize_track_data_for_processing(track_data):
processed_artists.append(artist)
elif isinstance(artist, dict) and 'name' in artist:
processed_artists.append(artist['name'])
print(f"🔧 [Sanitize] Converted artist from dict to string: '{artist['name']}'")
else:
processed_artists.append(str(artist))
print(f"🔧 [Sanitize] Converted artist to string: '{str(artist)}'")
sanitized['artists'] = processed_artists
else:
print(f"⚠️ [Sanitize] Unexpected artists format: {type(raw_artists)}")
@ -8970,16 +8951,55 @@ def stop_wishlist_auto_processing():
wishlist_auto_processing_timestamp = 0
wishlist_next_run_time = 0 # Clear countdown timer
def schedule_next_wishlist_processing():
"""Schedule next automatic wishlist processing in 30 minutes."""
def schedule_next_wishlist_processing(retry_count=0, max_retries=3):
"""
Schedule next automatic wishlist processing in 30 minutes.
Includes retry logic and atomic timer updates to prevent "0s" stuck state.
Args:
retry_count: Current retry attempt (internal use)
max_retries: Maximum number of retry attempts
"""
global wishlist_auto_timer, wishlist_next_run_time
with wishlist_timer_lock:
print("⏰ Scheduling next automatic wishlist processing in 30 minutes")
wishlist_next_run_time = time.time() + 1800.0 # Set timestamp for countdown display
wishlist_auto_timer = threading.Timer(1800.0, _process_wishlist_automatically) # 30 minutes (1800 seconds)
wishlist_auto_timer.daemon = True
wishlist_auto_timer.start()
try:
with wishlist_timer_lock:
# Cancel existing timer if present (prevent orphaned timers)
if wishlist_auto_timer is not None:
try:
wishlist_auto_timer.cancel()
except Exception as cancel_error:
print(f"⚠️ Failed to cancel old wishlist timer: {cancel_error}")
# Calculate next run time BEFORE creating timer
next_time = time.time() + 1800.0 # 30 minutes
# Create and start new timer
new_timer = threading.Timer(1800.0, _process_wishlist_automatically)
new_timer.daemon = True
new_timer.start()
# Only update globals AFTER successful timer creation and start
wishlist_next_run_time = next_time
wishlist_auto_timer = new_timer
print(f"⏰ Scheduled next wishlist processing in 30 minutes")
except Exception as e:
print(f"❌ [CRITICAL] Failed to schedule wishlist processing (attempt {retry_count + 1}/{max_retries}): {e}")
import traceback
traceback.print_exc()
# Retry with exponential backoff
if retry_count < max_retries:
retry_delay = 5 * (2 ** retry_count) # 5s, 10s, 20s
print(f"🔄 Retrying wishlist scheduling in {retry_delay} seconds...")
retry_timer = threading.Timer(retry_delay, lambda: schedule_next_wishlist_processing(retry_count + 1, max_retries))
retry_timer.daemon = True
retry_timer.start()
else:
print(f"❌ [FATAL] Failed to schedule wishlist processing after {max_retries} attempts!")
print("⚠️ MANUAL INTERVENTION REQUIRED - Wishlist auto-processing will not run!")
def _process_wishlist_automatically():
"""Main automatic processing logic that runs in background thread."""
@ -9035,19 +9055,21 @@ def _process_wishlist_automatically():
with wishlist_timer_lock:
wishlist_auto_processing = False
wishlist_auto_processing_timestamp = 0
wishlist_next_run_time = 0 # Clear old timer before rescheduling
schedule_next_wishlist_processing()
# Don't clear wishlist_next_run_time - let schedule function handle atomically
schedule_next_wishlist_processing() # Has built-in error handling and retry
return
print(f"🎵 [Auto-Wishlist] Found {count} tracks in wishlist, starting automatic processing...")
# Check if wishlist processing is already active
# Check if wishlist processing is already active (auto or manual)
playlist_id = "wishlist"
with tasks_lock:
for batch_id, batch_data in download_batches.items():
if (batch_data.get('playlist_id') == playlist_id and
batch_playlist_id = batch_data.get('playlist_id')
# Check for both auto ('wishlist') and manual ('wishlist_manual') batches
if (batch_playlist_id in ['wishlist', 'wishlist_manual'] and
batch_data.get('phase') not in ['complete', 'error', 'cancelled']):
print("⚠️ Wishlist processing already active in another batch, skipping automatic start")
print(f"⚠️ Wishlist processing already active in another batch ({batch_playlist_id}), skipping automatic start")
with wishlist_timer_lock:
wishlist_auto_processing = False
schedule_next_wishlist_processing()
@ -9281,15 +9303,10 @@ def _process_wishlist_automatically():
with wishlist_timer_lock:
wishlist_auto_processing = False
wishlist_auto_processing_timestamp = 0
wishlist_next_run_time = 0 # Clear old timer before rescheduling
# Don't clear wishlist_next_run_time - let schedule function handle atomically
# CRITICAL: Wrap rescheduling in try/except to prevent timer thread death
try:
schedule_next_wishlist_processing()
except Exception as schedule_error:
print(f"❌ [CRITICAL] Failed to schedule next wishlist processing: {schedule_error}")
import traceback
traceback.print_exc()
# Reschedule next cycle (has built-in error handling and retry logic)
schedule_next_wishlist_processing()
# ===============================
# == DATABASE UPDATER API ==
@ -9731,6 +9748,13 @@ def start_wishlist_missing_downloads():
identical to playlist processing, maintaining exactly 3 concurrent downloads.
"""
try:
# Check if auto-processing is currently running (prevent concurrent wishlist access)
if is_wishlist_actually_processing():
return jsonify({
"error": "Wishlist auto-processing is currently running. Please wait for it to complete.",
"retry_after": 30
}), 409
data = request.get_json() or {}
force_download_all = data.get('force_download_all', False)
category = data.get('category') # Get category filter (albums or singles)
@ -11378,16 +11402,11 @@ def _process_failed_tracks_to_wishlist_exact_with_auto_completion(batch_id):
with wishlist_timer_lock:
wishlist_auto_processing = False
wishlist_auto_processing_timestamp = 0
wishlist_next_run_time = 0 # Clear old timer before rescheduling
# Don't clear wishlist_next_run_time here - let schedule function handle it atomically
# Schedule next automatic processing cycle
# Schedule next automatic processing cycle (handles timer atomically with retry logic)
print("⏰ [Auto-Wishlist] Scheduling next automatic cycle in 30 minutes")
try:
schedule_next_wishlist_processing()
except Exception as schedule_error:
print(f"❌ [CRITICAL] Failed to schedule next wishlist processing: {schedule_error}")
import traceback
traceback.print_exc()
schedule_next_wishlist_processing() # Has built-in error handling and retry
return completion_summary
@ -11400,16 +11419,11 @@ def _process_failed_tracks_to_wishlist_exact_with_auto_completion(batch_id):
with wishlist_timer_lock:
wishlist_auto_processing = False
wishlist_auto_processing_timestamp = 0
wishlist_next_run_time = 0 # Clear old timer before rescheduling
# Don't clear wishlist_next_run_time here - let schedule function handle it atomically
# Schedule next cycle even after error to maintain continuity
# Schedule next cycle even after error to maintain continuity (has built-in retry logic)
print("⏰ [Auto-Wishlist] Scheduling next cycle after error (30 minutes)")
try:
schedule_next_wishlist_processing()
except Exception as schedule_error:
print(f"❌ [CRITICAL] Failed to schedule next wishlist processing: {schedule_error}")
import traceback
traceback.print_exc()
schedule_next_wishlist_processing() # Has built-in error handling and retry
return {'tracks_added': 0, 'errors': 1, 'total_failed': 0}
@ -17665,16 +17679,55 @@ def stop_watchlist_auto_scanning():
watchlist_auto_scanning = False
watchlist_auto_scanning_timestamp = 0
def schedule_next_watchlist_scan():
"""Schedule next automatic watchlist scan in 24 hours."""
def schedule_next_watchlist_scan(retry_count=0, max_retries=3):
"""
Schedule next automatic watchlist scan in 24 hours.
Includes retry logic and atomic timer updates to prevent "0s" stuck state.
Args:
retry_count: Current retry attempt (internal use)
max_retries: Maximum number of retry attempts
"""
global watchlist_auto_timer, watchlist_next_run_time
with watchlist_timer_lock:
print("⏰ Scheduling next automatic watchlist scan in 24 hours")
watchlist_next_run_time = time.time() + 86400.0 # Set timestamp for countdown display
watchlist_auto_timer = threading.Timer(86400.0, _process_watchlist_scan_automatically) # 24 hours
watchlist_auto_timer.daemon = True
watchlist_auto_timer.start()
try:
with watchlist_timer_lock:
# Cancel existing timer if present (prevent orphaned timers)
if watchlist_auto_timer is not None:
try:
watchlist_auto_timer.cancel()
except Exception as cancel_error:
print(f"⚠️ Failed to cancel old watchlist timer: {cancel_error}")
# Calculate next run time BEFORE creating timer
next_time = time.time() + 86400.0 # 24 hours
# Create and start new timer
new_timer = threading.Timer(86400.0, _process_watchlist_scan_automatically)
new_timer.daemon = True
new_timer.start()
# Only update globals AFTER successful timer creation and start
watchlist_next_run_time = next_time
watchlist_auto_timer = new_timer
print(f"⏰ Scheduled next watchlist scan in 24 hours")
except Exception as e:
print(f"❌ [CRITICAL] Failed to schedule watchlist scan (attempt {retry_count + 1}/{max_retries}): {e}")
import traceback
traceback.print_exc()
# Retry with exponential backoff
if retry_count < max_retries:
retry_delay = 5 * (2 ** retry_count) # 5s, 10s, 20s
print(f"🔄 Retrying watchlist scheduling in {retry_delay} seconds...")
retry_timer = threading.Timer(retry_delay, lambda: schedule_next_watchlist_scan(retry_count + 1, max_retries))
retry_timer.daemon = True
retry_timer.start()
else:
print(f"❌ [FATAL] Failed to schedule watchlist scan after {max_retries} attempts!")
print("⚠️ MANUAL INTERVENTION REQUIRED - Watchlist auto-scanning will not run!")
def _process_watchlist_scan_automatically():
"""Main automatic scanning logic that runs in background thread."""

Loading…
Cancel
Save