Update youtube_client.py

pull/115/head
Broque Thomas 4 months ago
parent eaf33f6f6d
commit ce67e64ff7

@ -14,10 +14,13 @@ import sys
import os
import re
import platform
import asyncio
import uuid
from typing import List, Optional, Dict, Any, Tuple
from dataclasses import dataclass
from pathlib import Path
from datetime import datetime
from enum import Enum
try:
import yt_dlp
@ -28,6 +31,9 @@ from utils.logging_config import get_logger
from core.matching_engine import MusicMatchingEngine
from core.spotify_client import Track as SpotifyTrack
# Import Soulseek data structures for drop-in replacement compatibility
from core.soulseek_client import SearchResult, TrackResult, AlbumResult, DownloadStatus
logger = get_logger("youtube_client")
@ -103,6 +109,11 @@ class YouTubeClient:
logger.error("❌ ffmpeg is required but not found")
logger.error("The client will attempt to auto-download ffmpeg on first use")
# Download queue management (mirrors Soulseek's download tracking)
# Maps download_id -> download_info dict
self.active_downloads: Dict[str, Dict[str, Any]] = {}
self._download_lock = asyncio.Lock()
# Configure yt-dlp options
self.download_opts = {
'format': 'bestaudio/best',
@ -115,10 +126,23 @@ class YouTubeClient:
'preferredcodec': 'mp3',
'preferredquality': '320',
}],
'progress_hooks': [self._progress_hook], # Track download progress
}
# Track download progress
self.current_download_progress = {}
# Track current download progress (mirrors Soulseek transfer tracking)
self.current_download_id: Optional[str] = None
self.current_download_progress = {
'status': 'idle', # idle, downloading, postprocessing, completed, error
'percent': 0.0,
'downloaded_bytes': 0,
'total_bytes': 0,
'speed': 0, # bytes/sec
'eta': 0, # seconds
'filename': ''
}
# Optional progress callback for UI updates
self.progress_callback = None
def is_available(self) -> bool:
"""
@ -139,30 +163,146 @@ class YouTubeClient:
logger.error("yt-dlp is not installed")
return False
def check_connection(self) -> bool:
async def check_connection(self) -> bool:
"""
Test if YouTube is accessible by attempting a lightweight API call.
Test if YouTube is accessible by attempting a lightweight API call (async, Soulseek-compatible).
Returns:
bool: True if YouTube is reachable, False otherwise
"""
try:
ydl_opts = {
'quiet': True,
'no_warnings': True,
'extract_flat': True, # Don't download, just extract info
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
# Try to extract info from a known video (YouTube's own channel trailer)
# This is a lightweight test that doesn't download anything
info = ydl.extract_info("https://www.youtube.com/watch?v=dQw4w9WgXcQ", download=False)
return info is not None
# Run in executor to avoid blocking event loop
loop = asyncio.get_event_loop()
def _check():
ydl_opts = {
'quiet': True,
'no_warnings': True,
'extract_flat': True, # Don't download, just extract info
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
# Try to extract info from a known video (YouTube's own channel trailer)
# This is a lightweight test that doesn't download anything
info = ydl.extract_info("https://www.youtube.com/watch?v=dQw4w9WgXcQ", download=False)
return info is not None
return await loop.run_in_executor(None, _check)
except Exception as e:
logger.error(f"YouTube connection check failed: {e}")
return False
def is_configured(self) -> bool:
"""
Check if YouTube client is configured and ready to use (matches Soulseek interface).
YouTube doesn't require authentication or configuration like Soulseek,
so this just checks if the client is available.
Returns:
bool: True if YouTube client is ready to use
"""
return self.is_available()
def set_progress_callback(self, callback):
"""
Set a callback function for progress updates.
Callback signature: callback(progress_dict)
Progress dict contains:
- status: 'idle', 'downloading', 'postprocessing', 'completed', 'error'
- percent: 0.0-100.0
- downloaded_bytes: int
- total_bytes: int
- speed: bytes/sec
- eta: estimated seconds remaining
- filename: current file being processed
"""
self.progress_callback = callback
def _progress_hook(self, d):
"""
yt-dlp progress hook - called during download to report progress.
Updates the active_downloads dictionary for the current download.
Mirrors Soulseek's transfer status updates.
"""
try:
# Only update if we have a current download ID
if not self.current_download_id:
return
status = d.get('status', 'unknown')
if status == 'downloading':
downloaded = d.get('downloaded_bytes', 0)
total = d.get('total_bytes') or d.get('total_bytes_estimate', 0)
speed = d.get('speed', 0) or 0
eta = d.get('eta', 0) or 0
if total > 0:
percent = (downloaded / total) * 100
else:
percent = 0
# Update active downloads dictionary (thread-safe update)
if self.current_download_id in self.active_downloads:
download_info = self.active_downloads[self.current_download_id]
download_info['state'] = 'Downloading'
download_info['progress'] = round(percent, 1)
download_info['transferred'] = downloaded
download_info['size'] = total
download_info['speed'] = int(speed)
download_info['time_remaining'] = int(eta) if eta > 0 else None
# Also update current_download_progress for legacy compatibility
self.current_download_progress = {
'status': 'downloading',
'percent': round(percent, 1),
'downloaded_bytes': downloaded,
'total_bytes': total,
'speed': int(speed),
'eta': int(eta),
'filename': d.get('filename', '')
}
# Call progress callback if set (for UI updates)
if self.progress_callback:
self.progress_callback(self.current_download_progress)
elif status == 'finished':
# Update to postprocessing state
if self.current_download_id in self.active_downloads:
self.active_downloads[self.current_download_id]['state'] = 'Postprocessing'
self.active_downloads[self.current_download_id]['progress'] = 100.0
self.current_download_progress['status'] = 'postprocessing'
self.current_download_progress['percent'] = 100.0
if self.progress_callback:
self.progress_callback(self.current_download_progress)
elif status == 'error':
# Mark as error
if self.current_download_id in self.active_downloads:
self.active_downloads[self.current_download_id]['state'] = 'Errored'
self.current_download_progress['status'] = 'error'
if self.progress_callback:
self.progress_callback(self.current_download_progress)
except Exception as e:
logger.debug(f"Progress hook error: {e}")
def get_download_progress(self) -> dict:
"""
Get current download progress (mirrors Soulseek's get_download_status).
Returns:
Dict with progress information (status, percent, speed, etc.)
"""
return self.current_download_progress.copy()
def _check_ffmpeg(self) -> bool:
"""Check if ffmpeg is available (system PATH or auto-download to tools folder)"""
import shutil
@ -286,64 +426,146 @@ class YouTubeClient:
logger.error(f" Mac: brew install ffmpeg")
return False
def search(self, query: str, max_results: int = 10) -> List[YouTubeSearchResult]:
def _youtube_to_track_result(self, entry: dict, best_audio: Optional[dict] = None) -> TrackResult:
"""
Search YouTube for tracks matching the query.
Convert YouTube video entry to TrackResult (Soulseek-compatible format).
This is the adapter layer that allows YouTube client to speak Soulseek's language.
Args:
entry: YouTube video entry from yt-dlp
best_audio: Best audio format info (optional)
Returns:
TrackResult object compatible with Soulseek interface
"""
# Parse artist and title from YouTube video title
title = entry.get('title', '')
artist = None
track_title = title
# Common YouTube title patterns: "Artist - Title", "Artist: Title", etc.
patterns = [
r'^(.+?)\s*[-–—]\s*(.+)$', # Artist - Title
r'^(.+?)\s*:\s*(.+)$', # Artist: Title
r'^(.+?)\s+by\s+(.+)$', # Title by Artist (reversed)
]
for pattern in patterns:
match = re.match(pattern, title, re.IGNORECASE)
if match:
if 'by' in pattern:
track_title = match.group(1).strip()
artist = match.group(2).strip()
else:
artist = match.group(1).strip()
track_title = match.group(2).strip()
break
# Fallback: use uploader/channel as artist
if not artist:
artist = entry.get('uploader', entry.get('channel', 'Unknown Artist'))
# Extract file size (estimate from format)
file_size = 0
if best_audio and 'filesize' in best_audio:
file_size = best_audio.get('filesize', 0) or best_audio.get('filesize_approx', 0) or 0
# Extract bitrate
bitrate = None
if best_audio:
bitrate = int(best_audio.get('abr', best_audio.get('tbr', 0)))
# Duration in milliseconds (Soulseek uses ms)
duration_ms = int(entry.get('duration', 0) * 1000) if entry.get('duration') else None
# Quality string
quality_str = self._format_quality_string(best_audio) if best_audio else "unknown"
# Video URL as filename (we'll use this to identify the track later)
video_id = entry.get('id', '')
filename = f"{video_id}||{title}" # Store video_id and title for later download
return TrackResult(
username="youtube", # YouTube doesn't have users - use constant
filename=filename,
size=file_size,
bitrate=bitrate,
duration=duration_ms,
quality="mp3", # We always convert to MP3
free_upload_slots=999, # YouTube always available
upload_speed=999999, # High speed indicator
queue_length=0, # No queue for YouTube
artist=artist,
title=track_title,
album=None, # YouTube videos don't have album info (will be added from Spotify)
track_number=None
)
async def search(self, query: str, timeout: int = None, progress_callback=None) -> tuple[List[TrackResult], List[AlbumResult]]:
"""
Search YouTube for tracks matching the query (async, Soulseek-compatible interface).
Args:
query: Search query (e.g., "Artist Name - Song Title")
max_results: Maximum number of results to return
timeout: Ignored for YouTube (kept for interface compatibility)
progress_callback: Optional callback for progress updates
Returns:
List of YouTubeSearchResult objects
Tuple of (track_results, album_results). Album results will always be empty for YouTube.
"""
logger.info(f"🔍 Searching YouTube for: {query}")
try:
ydl_opts = {
'quiet': True,
'no_warnings': True,
'extract_flat': False,
'default_search': 'ytsearch',
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
# Search YouTube
search_results = ydl.extract_info(f"ytsearch{max_results}:{query}", download=False)
if not search_results or 'entries' not in search_results:
logger.warning(f"No YouTube results found for: {query}")
return []
results = []
for entry in search_results['entries']:
if not entry:
continue
# Get best audio format info
best_audio = self._get_best_audio_format(entry.get('formats', []))
quality_str = self._format_quality_string(best_audio)
result = YouTubeSearchResult(
video_id=entry.get('id', ''),
title=entry.get('title', ''),
channel=entry.get('uploader', entry.get('channel', '')),
duration=entry.get('duration', 0),
url=entry.get('webpage_url', f"https://www.youtube.com/watch?v={entry.get('id')}"),
thumbnail=entry.get('thumbnail', ''),
view_count=entry.get('view_count', 0),
upload_date=entry.get('upload_date', ''),
available_quality=quality_str,
best_audio_format=best_audio,
)
results.append(result)
logger.info(f"✅ Found {len(results)} YouTube results")
return results
# Run yt-dlp in executor to avoid blocking event loop
loop = asyncio.get_event_loop()
def _search():
ydl_opts = {
'quiet': True,
'no_warnings': True,
'extract_flat': False,
'default_search': 'ytsearch',
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
# Search YouTube (max 10 results)
search_results = ydl.extract_info(f"ytsearch10:{query}", download=False)
if not search_results or 'entries' not in search_results:
return []
return search_results['entries']
# Run search in thread pool
entries = await loop.run_in_executor(None, _search)
if not entries:
logger.warning(f"No YouTube results found for: {query}")
return ([], [])
# Convert to TrackResult objects
track_results = []
for entry in entries:
if not entry:
continue
# Get best audio format info
best_audio = self._get_best_audio_format(entry.get('formats', []))
# Convert to TrackResult (Soulseek format)
track_result = self._youtube_to_track_result(entry, best_audio)
track_results.append(track_result)
logger.info(f"✅ Found {len(track_results)} YouTube tracks")
# Return tuple: (tracks, albums) - YouTube doesn't have albums, so return empty list
return (track_results, [])
except Exception as e:
logger.error(f"❌ YouTube search failed: {e}")
return []
import traceback
traceback.print_exc()
return ([], [])
def _get_best_audio_format(self, formats: List[Dict]) -> Optional[Dict]:
"""Extract best audio format from available formats"""
@ -490,139 +712,251 @@ class YouTubeClient:
logger.info(f"✅ Found {len(matches)} matches above {min_confidence} confidence")
return matches
def download(self, yt_result: YouTubeSearchResult, spotify_track: Optional[SpotifyTrack] = None) -> Optional[str]:
async def download(self, username: str, filename: str, file_size: int = 0) -> Optional[str]:
"""
Download YouTube video as audio with proper metadata tagging (mirrors soulseek download).
Download YouTube video as audio (async, Soulseek-compatible interface).
Args:
yt_result: YouTube result to download
spotify_track: Optional Spotify track for metadata embedding
username: Ignored for YouTube (always "youtube")
filename: Encoded as "video_id||title" from search results
file_size: Ignored for YouTube (kept for interface compatibility)
Returns:
Path to downloaded file, or None if failed
download_id: Unique ID for tracking this download, or None if failed to start
"""
logger.info(f"📥 Starting download: {yt_result.title}")
logger.info(f" Quality: {yt_result.available_quality}")
logger.info(f" URL: {yt_result.url}")
try:
# Parse filename to extract video_id
if '||' not in filename:
logger.error(f"❌ Invalid filename format: {filename}")
return None
video_id, title = filename.split('||', 1)
youtube_url = f"https://www.youtube.com/watch?v={video_id}"
logger.info(f"📥 Starting YouTube download: {title}")
logger.info(f" URL: {youtube_url}")
# Create unique download ID
download_id = str(uuid.uuid4())
# Initialize download info in active downloads
async with self._download_lock:
self.active_downloads[download_id] = {
'id': download_id,
'filename': title,
'username': 'youtube',
'state': 'Initializing',
'progress': 0.0,
'size': file_size or 0,
'transferred': 0,
'speed': 0,
'time_remaining': None,
'video_id': video_id,
'url': youtube_url,
'title': title,
'file_path': None, # Will be set when download completes
'task': None # Will hold the background task
}
# Start download in background task
loop = asyncio.get_event_loop()
task = loop.create_task(self._download_internal(download_id, youtube_url, title))
# Store task reference
async with self._download_lock:
self.active_downloads[download_id]['task'] = task
logger.info(f"✅ Download started with ID: {download_id}")
return download_id
except Exception as e:
logger.error(f"❌ Failed to start download: {e}")
import traceback
traceback.print_exc()
return None
async def _download_internal(self, download_id: str, youtube_url: str, title: str):
"""
Internal method to perform the actual YouTube download in the background.
Args:
download_id: Unique download ID
youtube_url: YouTube video URL
title: Video title for display
"""
try:
# Build download options
download_opts = self.download_opts.copy()
# Get Spotify album details for proper folder structure and track numbering
track_number = 1
disc_number = 1
release_year = str(datetime.now().year)
album_artist = None
artist_genres = []
if spotify_track and spotify_track.id and not spotify_track.id.startswith('test'):
# Fetch full Spotify details to get track number, disc number, release date, genres
# Update state to downloading
async with self._download_lock:
if download_id in self.active_downloads:
self.active_downloads[download_id]['state'] = 'Downloading'
# Set current download ID for progress tracking
self.current_download_id = download_id
# Run yt-dlp download in thread pool (to avoid blocking event loop)
loop = asyncio.get_event_loop()
def _download():
try:
from core.spotify_client import SpotifyClient
spotify_client = SpotifyClient()
if spotify_client.is_authenticated():
track_details = spotify_client.get_track_details(spotify_track.id)
if track_details:
track_number = track_details.get('track_number', 1)
disc_number = track_details.get('disc_number', 1)
# Use album artist if available, otherwise use track artist
album_data = track_details.get('album', {})
if album_data.get('artists'):
album_artist = album_data['artists'][0]
# Get actual release year from Spotify
release_date = album_data.get('release_date', '')
if release_date:
release_year = release_date.split('-')[0] # Extract year from YYYY-MM-DD
# Get artist genres (for metadata parity with Soulseek flow)
try:
primary_artist = track_details.get('primary_artist')
if primary_artist:
artist_info = spotify_client.get_artist(primary_artist)
if artist_info and hasattr(artist_info, 'genres'):
artist_genres = artist_info.genres
except:
pass
logger.info(f" 📀 Spotify track #{track_number} on album: {spotify_track.album} ({release_year})")
except Exception as e:
logger.warning(f" ⚠️ Could not fetch Spotify track details: {e}")
# Use default download options
download_opts = self.download_opts.copy()
# If we have Spotify metadata, use production file organization
if spotify_track:
artist = spotify_track.artists[0] if spotify_track.artists else yt_result.parsed_artist
title = spotify_track.name
album = spotify_track.album
# Perform download
with yt_dlp.YoutubeDL(download_opts) as ydl:
info = ydl.extract_info(youtube_url, download=True)
# Use album artist if found, otherwise use track artist
if not album_artist:
album_artist = artist
# Create folder structure: $albumartist/$albumartist - $album/
album_folder = self.download_path / album_artist / f"{album_artist} - {album}"
album_folder.mkdir(parents=True, exist_ok=True)
# File naming: $track - $title (production format)
final_filename = f"{track_number:02d} - {title}"
# Sanitize filename (remove invalid characters)
final_filename = re.sub(r'[<>:"/\\|?*]', '', final_filename)
# Override output template with production folder structure
download_opts['outtmpl'] = str(album_folder / f'{final_filename}.%(ext)s')
logger.info(f" 📁 Album folder: {album_artist}/{album_artist} - {album}/")
logger.info(f" 📝 Filename: {final_filename}.mp3")
# Add metadata postprocessor with Spotify info
download_opts['postprocessor_args'] = {
'ffmpeg': [
'-metadata', f'artist={artist}',
'-metadata', f'title={title}',
'-metadata', f'album={album}',
'-metadata', f'album_artist={album_artist}',
'-metadata', f'track={track_number}/{spotify_track.total_tracks if hasattr(spotify_track, "total_tracks") else track_number}',
'-metadata', f'disc={disc_number}',
'-metadata', f'date={release_year}',
'-metadata', 'comment=Downloaded via SoulSync (YouTube)',
]
}
# Get final filename (will be MP3 after ffmpeg conversion)
filename = Path(ydl.prepare_filename(info)).with_suffix('.mp3')
# Perform download
with yt_dlp.YoutubeDL(download_opts) as ydl:
info = ydl.extract_info(yt_result.url, download=True)
if filename.exists():
return str(filename)
else:
logger.error(f"❌ Download completed but file not found: {filename}")
return None
# Get final filename (will be MP3 after ffmpeg conversion)
filename = Path(ydl.prepare_filename(info)).with_suffix('.mp3')
except Exception as e:
logger.error(f"❌ Download failed in thread: {e}")
import traceback
traceback.print_exc()
return None
if filename.exists():
logger.info(f"✅ Download successful: {filename}")
# Run download
file_path = await loop.run_in_executor(None, _download)
# Post-download: Enhance metadata with mutagen
album_art_url = self._enhance_metadata(str(filename), spotify_track, yt_result, track_number, disc_number, release_year, artist_genres)
if file_path:
# Mark download as completed
async with self._download_lock:
if download_id in self.active_downloads:
self.active_downloads[download_id]['state'] = 'Completed'
self.active_downloads[download_id]['progress'] = 100.0
self.active_downloads[download_id]['file_path'] = file_path
# Save cover.jpg to album folder (like production)
if album_art_url and spotify_track:
self._save_cover_art(filename.parent, album_art_url)
logger.info(f"✅ Download {download_id} completed: {file_path}")
else:
# Mark as error
async with self._download_lock:
if download_id in self.active_downloads:
self.active_downloads[download_id]['state'] = 'Errored'
# Create .lrc lyrics file (like production)
if spotify_track:
self._create_lyrics_file(str(filename), spotify_track)
logger.error(f"❌ Download {download_id} failed")
return str(filename)
else:
logger.error(f"❌ Download completed but file not found: {filename}")
return None
except asyncio.CancelledError:
# Download was cancelled
async with self._download_lock:
if download_id in self.active_downloads:
self.active_downloads[download_id]['state'] = 'Cancelled'
logger.info(f"⚠️ Download {download_id} cancelled")
raise
except Exception as e:
logger.error(f"❌ Download failed: {e}")
# Download error
async with self._download_lock:
if download_id in self.active_downloads:
self.active_downloads[download_id]['state'] = 'Errored'
logger.error(f"❌ Download {download_id} failed: {e}")
import traceback
traceback.print_exc()
return None
finally:
# Clear current download ID
if self.current_download_id == download_id:
self.current_download_id = None
async def get_all_downloads(self) -> List[DownloadStatus]:
"""
Get all active downloads (matches Soulseek interface).
Returns:
List of DownloadStatus objects for all active downloads
"""
download_statuses = []
async with self._download_lock:
for download_id, download_info in self.active_downloads.items():
status = DownloadStatus(
id=download_info['id'],
filename=download_info['filename'],
username=download_info['username'],
state=download_info['state'],
progress=download_info['progress'],
size=download_info['size'],
transferred=download_info['transferred'],
speed=download_info['speed'],
time_remaining=download_info.get('time_remaining')
)
download_statuses.append(status)
return download_statuses
async def get_download_status(self, download_id: str) -> Optional[DownloadStatus]:
"""
Get status of a specific download (matches Soulseek interface).
Args:
download_id: Download ID to query
Returns:
DownloadStatus object or None if not found
"""
async with self._download_lock:
if download_id not in self.active_downloads:
return None
download_info = self.active_downloads[download_id]
return DownloadStatus(
id=download_info['id'],
filename=download_info['filename'],
username=download_info['username'],
state=download_info['state'],
progress=download_info['progress'],
size=download_info['size'],
transferred=download_info['transferred'],
speed=download_info['speed'],
time_remaining=download_info.get('time_remaining')
)
async def cancel_download(self, download_id: str, username: str = None, remove: bool = False) -> bool:
"""
Cancel an active download (matches Soulseek interface).
Args:
download_id: Download ID to cancel
username: Ignored for YouTube (kept for interface compatibility)
remove: If True, remove from active downloads after cancelling
Returns:
True if cancelled successfully, False otherwise
"""
try:
async with self._download_lock:
if download_id not in self.active_downloads:
logger.warning(f"⚠️ Download {download_id} not found")
return False
download_info = self.active_downloads[download_id]
task = download_info.get('task')
# Cancel the background task if it exists
if task and not task.done():
task.cancel()
logger.info(f"⚠️ Cancelled download {download_id}")
# Update state
download_info['state'] = 'Cancelled'
# Remove from active downloads if requested
if remove:
del self.active_downloads[download_id]
logger.info(f"🗑️ Removed download {download_id} from queue")
return True
except Exception as e:
logger.error(f"❌ Failed to cancel download {download_id}: {e}")
return False
def _enhance_metadata(self, filepath: str, spotify_track: Optional[SpotifyTrack], yt_result: YouTubeSearchResult, track_number: int = 1, disc_number: int = 1, release_year: str = None, artist_genres: list = None):
"""

Loading…
Cancel
Save