|
|
|
|
@ -278,19 +278,21 @@ class SyncStatusProcessingWorkerSignals(QObject):
|
|
|
|
|
class SyncStatusProcessingWorker(QRunnable):
|
|
|
|
|
"""
|
|
|
|
|
Runs download status processing in a background thread for the sync modal.
|
|
|
|
|
It checks the slskd API and the file system to provide a reliable status.
|
|
|
|
|
It checks the slskd API to provide a reliable status, with fallbacks.
|
|
|
|
|
This implementation is based on the working logic from downloads.py to restore live updates.
|
|
|
|
|
"""
|
|
|
|
|
def __init__(self, soulseek_client, download_items_data, transfers_directory):
|
|
|
|
|
def __init__(self, soulseek_client, download_items_data):
|
|
|
|
|
super().__init__()
|
|
|
|
|
self.signals = SyncStatusProcessingWorkerSignals()
|
|
|
|
|
self.soulseek_client = soulseek_client
|
|
|
|
|
self.download_items_data = download_items_data
|
|
|
|
|
self.transfers_directory = transfers_directory # NEW: Pass the transfers directory
|
|
|
|
|
# This worker no longer performs filesystem checks, so it doesn't need transfers_directory.
|
|
|
|
|
|
|
|
|
|
def run(self):
|
|
|
|
|
"""The main logic of the background worker."""
|
|
|
|
|
try:
|
|
|
|
|
import asyncio
|
|
|
|
|
import os
|
|
|
|
|
loop = asyncio.new_event_loop()
|
|
|
|
|
asyncio.set_event_loop(loop)
|
|
|
|
|
|
|
|
|
|
@ -303,22 +305,44 @@ class SyncStatusProcessingWorker(QRunnable):
|
|
|
|
|
if not transfers_data:
|
|
|
|
|
transfers_data = []
|
|
|
|
|
|
|
|
|
|
all_transfers = [
|
|
|
|
|
file for user_data in transfers_data if 'directories' in user_data
|
|
|
|
|
for directory in user_data['directories'] if 'files' in directory
|
|
|
|
|
for file in directory['files']
|
|
|
|
|
]
|
|
|
|
|
# --- FIX: More robustly parse the transfers data ---
|
|
|
|
|
# Errored/finished downloads might not be nested inside 'directories'.
|
|
|
|
|
# This checks for a 'files' list at both the user and directory levels.
|
|
|
|
|
all_transfers = []
|
|
|
|
|
for user_data in transfers_data:
|
|
|
|
|
# Check for files directly under the user object
|
|
|
|
|
if 'files' in user_data and isinstance(user_data['files'], list):
|
|
|
|
|
all_transfers.extend(user_data['files'])
|
|
|
|
|
# Also check for files nested inside directories
|
|
|
|
|
if 'directories' in user_data and isinstance(user_data['directories'], list):
|
|
|
|
|
for directory in user_data['directories']:
|
|
|
|
|
if 'files' in directory and isinstance(directory['files'], list):
|
|
|
|
|
all_transfers.extend(directory['files'])
|
|
|
|
|
|
|
|
|
|
transfers_by_id = {t['id']: t for t in all_transfers}
|
|
|
|
|
|
|
|
|
|
for item_data in self.download_items_data:
|
|
|
|
|
matching_transfer = transfers_by_id.get(item_data['download_id'])
|
|
|
|
|
matching_transfer = None
|
|
|
|
|
|
|
|
|
|
# Step 1: Try to match by the original download ID.
|
|
|
|
|
if item_data.get('download_id'):
|
|
|
|
|
matching_transfer = transfers_by_id.get(item_data['download_id'])
|
|
|
|
|
|
|
|
|
|
# Step 2: If no match by ID, fall back to an exact filename match.
|
|
|
|
|
if not matching_transfer:
|
|
|
|
|
expected_basename = os.path.basename(item_data['file_path']).lower()
|
|
|
|
|
for t in all_transfers:
|
|
|
|
|
api_basename = os.path.basename(t.get('filename', '')).lower()
|
|
|
|
|
if api_basename == expected_basename:
|
|
|
|
|
matching_transfer = t
|
|
|
|
|
print(f"ℹ️ Found download for '{expected_basename}' by exact filename match.")
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
if matching_transfer:
|
|
|
|
|
state = matching_transfer.get('state', 'Unknown')
|
|
|
|
|
progress = matching_transfer.get('percentComplete', 0)
|
|
|
|
|
new_status = 'queued'
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Determine status with correct priority (Errored/Cancelled before Completed)
|
|
|
|
|
if 'Cancelled' in state or 'Canceled' in state:
|
|
|
|
|
new_status = 'cancelled'
|
|
|
|
|
elif 'Failed' in state or 'Errored' in state:
|
|
|
|
|
@ -339,34 +363,14 @@ class SyncStatusProcessingWorker(QRunnable):
|
|
|
|
|
}
|
|
|
|
|
results.append(payload)
|
|
|
|
|
else:
|
|
|
|
|
# --- NEW LOGIC: Check file system for completed downloads ---
|
|
|
|
|
# The download is no longer in the API. Check if the file exists.
|
|
|
|
|
import os
|
|
|
|
|
expected_filename = os.path.basename(item_data['file_path'])
|
|
|
|
|
expected_filepath = os.path.join(self.transfers_directory, expected_filename)
|
|
|
|
|
|
|
|
|
|
if os.path.exists(expected_filepath):
|
|
|
|
|
# File exists! The download was successful.
|
|
|
|
|
print(f"✅ Verified completed download via file system: {expected_filename}")
|
|
|
|
|
payload = {
|
|
|
|
|
'widget_id': item_data['widget_id'],
|
|
|
|
|
'status': 'completed',
|
|
|
|
|
'progress': 100,
|
|
|
|
|
'transfer_id': item_data['download_id'],
|
|
|
|
|
}
|
|
|
|
|
# If not found in the API, it might have failed or been cancelled.
|
|
|
|
|
# Use a grace period before marking as failed.
|
|
|
|
|
item_data['api_missing_count'] = item_data.get('api_missing_count', 0) + 1
|
|
|
|
|
if item_data['api_missing_count'] >= 3:
|
|
|
|
|
expected_filename = os.path.basename(item_data['file_path'])
|
|
|
|
|
print(f"❌ Download failed (missing from API after 3 checks): {expected_filename}")
|
|
|
|
|
payload = {'widget_id': item_data['widget_id'], 'status': 'failed'}
|
|
|
|
|
results.append(payload)
|
|
|
|
|
else:
|
|
|
|
|
# File doesn't exist. It might be a genuine failure.
|
|
|
|
|
# Use the existing counter logic as a fallback.
|
|
|
|
|
item_data['api_missing_count'] = item_data.get('api_missing_count', 0) + 1
|
|
|
|
|
if item_data['api_missing_count'] >= 3: # After 3 checks (6 seconds)
|
|
|
|
|
print(f"❌ Download {item_data['download_id']} failed (missing from API and file system).")
|
|
|
|
|
payload = {
|
|
|
|
|
'widget_id': item_data['widget_id'],
|
|
|
|
|
'status': 'failed',
|
|
|
|
|
'transfer_id': item_data['download_id'],
|
|
|
|
|
}
|
|
|
|
|
results.append(payload)
|
|
|
|
|
|
|
|
|
|
self.signals.completed.emit(results)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
@ -374,8 +378,6 @@ class SyncStatusProcessingWorker(QRunnable):
|
|
|
|
|
traceback.print_exc()
|
|
|
|
|
self.signals.error.emit(str(e))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class PlaylistLoaderThread(QThread):
|
|
|
|
|
playlist_loaded = pyqtSignal(object) # Single playlist
|
|
|
|
|
loading_finished = pyqtSignal(int) # Total count
|
|
|
|
|
@ -3117,60 +3119,123 @@ class DownloadMissingTracksModal(QDialog):
|
|
|
|
|
self.on_parallel_track_failed(download_index, str(e))
|
|
|
|
|
|
|
|
|
|
def poll_all_download_statuses(self):
|
|
|
|
|
"""Starts the background worker to process download statuses."""
|
|
|
|
|
if self._is_status_update_running or not self.active_downloads: return
|
|
|
|
|
"""
|
|
|
|
|
Starts the background worker to process download statuses.
|
|
|
|
|
This version is updated to use the new worker and pass the correct data.
|
|
|
|
|
"""
|
|
|
|
|
if self._is_status_update_running or not self.active_downloads:
|
|
|
|
|
return
|
|
|
|
|
self._is_status_update_running = True
|
|
|
|
|
|
|
|
|
|
items_to_check = [{
|
|
|
|
|
'widget_id': d['download_index'], 'download_id': d['download_id'],
|
|
|
|
|
'file_path': d['slskd_result'].filename, 'status': 'downloading'
|
|
|
|
|
} for d in self.active_downloads]
|
|
|
|
|
# Create a snapshot of data needed by the worker thread
|
|
|
|
|
items_to_check = []
|
|
|
|
|
for d in self.active_downloads:
|
|
|
|
|
# Ensure slskd_result exists and has a filename
|
|
|
|
|
if d.get('slskd_result') and hasattr(d['slskd_result'], 'filename'):
|
|
|
|
|
# Pass the current missing count to the worker so it can be incremented
|
|
|
|
|
items_to_check.append({
|
|
|
|
|
'widget_id': d['download_index'],
|
|
|
|
|
'download_id': d.get('download_id'), # Use .get for safety
|
|
|
|
|
'file_path': d['slskd_result'].filename,
|
|
|
|
|
'api_missing_count': d.get('api_missing_count', 0)
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
if not items_to_check:
|
|
|
|
|
self._is_status_update_running = False
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
# NEW: Get the transfers directory and pass it to the worker
|
|
|
|
|
transfers_dir = self.downloads_page.transfers_directory
|
|
|
|
|
worker = SyncStatusProcessingWorker(self.parent_page.soulseek_client, items_to_check, transfers_dir)
|
|
|
|
|
# The new worker doesn't need the transfers directory.
|
|
|
|
|
worker = SyncStatusProcessingWorker(
|
|
|
|
|
self.parent_page.soulseek_client,
|
|
|
|
|
items_to_check
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
worker.signals.completed.connect(self._handle_processed_status_updates)
|
|
|
|
|
worker.signals.error.connect(lambda e: print(f"Status Worker Error: {e}"))
|
|
|
|
|
self.download_status_pool.start(worker)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _handle_processed_status_updates(self, results):
|
|
|
|
|
"""Applies status updates and triggers retry logic."""
|
|
|
|
|
"""
|
|
|
|
|
Applies status updates from the background worker and triggers retry logic.
|
|
|
|
|
This version correctly handles the payload from the new worker and adds a timeout for stuck downloads.
|
|
|
|
|
"""
|
|
|
|
|
import time
|
|
|
|
|
|
|
|
|
|
# Create a lookup for faster access to active download items
|
|
|
|
|
active_downloads_map = {d['download_index']: d for d in self.active_downloads}
|
|
|
|
|
|
|
|
|
|
for result in results:
|
|
|
|
|
download_index = result['widget_id']
|
|
|
|
|
new_status = result['status']
|
|
|
|
|
|
|
|
|
|
download_info = next((d for d in self.active_downloads if d['download_index'] == download_index), None)
|
|
|
|
|
if not download_info: continue
|
|
|
|
|
|
|
|
|
|
is_stuck = False
|
|
|
|
|
if new_status == 'queued':
|
|
|
|
|
if 'queued_start_time' not in download_info:
|
|
|
|
|
download_info['queued_start_time'] = time.time()
|
|
|
|
|
elif time.time() - download_info['queued_start_time'] > 90:
|
|
|
|
|
is_stuck = True
|
|
|
|
|
else:
|
|
|
|
|
if 'queued_start_time' in download_info: del download_info['queued_start_time']
|
|
|
|
|
|
|
|
|
|
if new_status in ['failed', 'cancelled'] or is_stuck:
|
|
|
|
|
self.active_downloads.remove(download_info)
|
|
|
|
|
download_info = active_downloads_map.get(download_index)
|
|
|
|
|
if not download_info:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# Update the main download_info object with the latest missing count from the worker
|
|
|
|
|
# This is important for the grace period logic to work across polls.
|
|
|
|
|
if 'api_missing_count' in result:
|
|
|
|
|
download_info['api_missing_count'] = result['api_missing_count']
|
|
|
|
|
|
|
|
|
|
# Update the download_id if the worker found a match by filename
|
|
|
|
|
if result.get('transfer_id') and download_info.get('download_id') != result['transfer_id']:
|
|
|
|
|
print(f"ℹ️ Corrected download ID for '{download_info['slskd_result'].filename}'")
|
|
|
|
|
download_info['download_id'] = result['transfer_id']
|
|
|
|
|
|
|
|
|
|
# Handle terminal states (completed, failed, cancelled)
|
|
|
|
|
if new_status in ['failed', 'cancelled']:
|
|
|
|
|
if download_info in self.active_downloads:
|
|
|
|
|
self.active_downloads.remove(download_info)
|
|
|
|
|
self.retry_parallel_download_with_fallback(download_info)
|
|
|
|
|
|
|
|
|
|
elif new_status == 'completed':
|
|
|
|
|
self.active_downloads.remove(download_info)
|
|
|
|
|
if download_info in self.active_downloads:
|
|
|
|
|
self.active_downloads.remove(download_info)
|
|
|
|
|
self.on_parallel_track_completed(download_index, success=True)
|
|
|
|
|
|
|
|
|
|
# Handle transient states (downloading, queued)
|
|
|
|
|
elif new_status == 'downloading':
|
|
|
|
|
progress = result.get('progress', 0)
|
|
|
|
|
self.track_table.setItem(download_info['table_index'], 4, QTableWidgetItem(f"⏬ Downloading ({progress}%)"))
|
|
|
|
|
|
|
|
|
|
# Reset queue timer if it exists
|
|
|
|
|
if 'queued_start_time' in download_info:
|
|
|
|
|
del download_info['queued_start_time']
|
|
|
|
|
|
|
|
|
|
# --- FIX: Add timeout for downloads stuck at 0% ---
|
|
|
|
|
# This handles cases where the API reports "InProgress" but no data is moving.
|
|
|
|
|
if progress < 1:
|
|
|
|
|
if 'downloading_start_time' not in download_info:
|
|
|
|
|
download_info['downloading_start_time'] = time.time()
|
|
|
|
|
# 90-second timeout for being stuck at 0%
|
|
|
|
|
elif time.time() - download_info['downloading_start_time'] > 90:
|
|
|
|
|
print(f"⚠️ Download for '{download_info['slskd_result'].filename}' is stuck at 0%. Retrying.")
|
|
|
|
|
if download_info in self.active_downloads:
|
|
|
|
|
self.active_downloads.remove(download_info)
|
|
|
|
|
self.retry_parallel_download_with_fallback(download_info)
|
|
|
|
|
else:
|
|
|
|
|
# Progress is being made, reset the timer
|
|
|
|
|
if 'downloading_start_time' in download_info:
|
|
|
|
|
del download_info['downloading_start_time']
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
elif new_status == 'queued':
|
|
|
|
|
self.track_table.setItem(download_info['table_index'], 4, QTableWidgetItem("... Queued"))
|
|
|
|
|
# Start a timer to detect if it's stuck in queue
|
|
|
|
|
if 'queued_start_time' not in download_info:
|
|
|
|
|
download_info['queued_start_time'] = time.time()
|
|
|
|
|
elif time.time() - download_info['queued_start_time'] > 90: # 90-second timeout
|
|
|
|
|
print(f"⚠️ Download for '{download_info['slskd_result'].filename}' is stuck in queue. Retrying.")
|
|
|
|
|
if download_info in self.active_downloads:
|
|
|
|
|
self.active_downloads.remove(download_info)
|
|
|
|
|
self.retry_parallel_download_with_fallback(download_info)
|
|
|
|
|
|
|
|
|
|
self._is_status_update_running = False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def retry_parallel_download_with_fallback(self, failed_download_info):
|
|
|
|
|
"""Retries a failed download by selecting the next-best cached candidate."""
|
|
|
|
|
download_index = failed_download_info['download_index']
|
|
|
|
|
|