|
|
|
|
@ -4336,7 +4336,8 @@ def _post_process_matched_download(context_key, context, file_path):
|
|
|
|
|
if task_id and batch_id:
|
|
|
|
|
print(f"🎯 [Post-Process] Calling completion callback for task {task_id} in batch {batch_id}")
|
|
|
|
|
|
|
|
|
|
# CRITICAL: Mark task as stream processed to prevent verification workflow from running
|
|
|
|
|
# Mark task as stream processed to prevent duplicate stream processing
|
|
|
|
|
# NOTE: Verification workflow will still run to ensure file is in transfer folder
|
|
|
|
|
with tasks_lock:
|
|
|
|
|
if task_id in download_tasks:
|
|
|
|
|
download_tasks[task_id]['stream_processed'] = True
|
|
|
|
|
@ -6082,7 +6083,7 @@ def _build_batch_status_data(batch_id, batch, live_transfers_lookup):
|
|
|
|
|
elif 'Completed' in state_str or 'Succeeded' in state_str:
|
|
|
|
|
# NEW VERIFICATION WORKFLOW: Use intermediate post_processing status
|
|
|
|
|
# Only set this status once to prevent multiple worker submissions
|
|
|
|
|
if task['status'] != 'post_processing' and not task.get('stream_processed', False):
|
|
|
|
|
if task['status'] != 'post_processing':
|
|
|
|
|
task_status['status'] = 'post_processing'
|
|
|
|
|
task['status'] = 'post_processing'
|
|
|
|
|
print(f"🔄 Task {task_id} API reports 'Succeeded' - starting post-processing verification")
|
|
|
|
|
@ -6090,13 +6091,10 @@ def _build_batch_status_data(batch_id, batch, live_transfers_lookup):
|
|
|
|
|
# Submit post-processing worker to verify file and complete the task
|
|
|
|
|
missing_download_executor.submit(_run_post_processing_worker, task_id, batch_id)
|
|
|
|
|
else:
|
|
|
|
|
# Keep showing post_processing status until worker completes, or mark completed if already stream processed
|
|
|
|
|
if task.get('stream_processed', False):
|
|
|
|
|
print(f"⏭️ Task {task_id} already processed by stream worker, marking as completed")
|
|
|
|
|
task_status['status'] = 'completed'
|
|
|
|
|
task['status'] = 'completed' # Update internal status too
|
|
|
|
|
else:
|
|
|
|
|
task_status['status'] = 'post_processing'
|
|
|
|
|
# FIXED: Always require verification workflow - no bypass for stream processed tasks
|
|
|
|
|
# Stream processing only handles metadata, not file verification
|
|
|
|
|
task_status['status'] = 'post_processing'
|
|
|
|
|
print(f"🔄 Task {task_id} waiting for verification worker to complete")
|
|
|
|
|
elif 'InProgress' in state_str:
|
|
|
|
|
task_status['status'] = 'downloading'
|
|
|
|
|
else:
|
|
|
|
|
|