diff --git a/core/youtube_client.py b/core/youtube_client.py index 329ad4b6..14cd6934 100644 --- a/core/youtube_client.py +++ b/core/youtube_client.py @@ -14,10 +14,13 @@ import sys import os import re import platform +import asyncio +import uuid from typing import List, Optional, Dict, Any, Tuple from dataclasses import dataclass from pathlib import Path from datetime import datetime +from enum import Enum try: import yt_dlp @@ -28,6 +31,9 @@ from utils.logging_config import get_logger from core.matching_engine import MusicMatchingEngine from core.spotify_client import Track as SpotifyTrack +# Import Soulseek data structures for drop-in replacement compatibility +from core.soulseek_client import SearchResult, TrackResult, AlbumResult, DownloadStatus + logger = get_logger("youtube_client") @@ -103,6 +109,11 @@ class YouTubeClient: logger.error("❌ ffmpeg is required but not found") logger.error("The client will attempt to auto-download ffmpeg on first use") + # Download queue management (mirrors Soulseek's download tracking) + # Maps download_id -> download_info dict + self.active_downloads: Dict[str, Dict[str, Any]] = {} + self._download_lock = asyncio.Lock() + # Configure yt-dlp options self.download_opts = { 'format': 'bestaudio/best', @@ -115,10 +126,23 @@ class YouTubeClient: 'preferredcodec': 'mp3', 'preferredquality': '320', }], + 'progress_hooks': [self._progress_hook], # Track download progress } - # Track download progress - self.current_download_progress = {} + # Track current download progress (mirrors Soulseek transfer tracking) + self.current_download_id: Optional[str] = None + self.current_download_progress = { + 'status': 'idle', # idle, downloading, postprocessing, completed, error + 'percent': 0.0, + 'downloaded_bytes': 0, + 'total_bytes': 0, + 'speed': 0, # bytes/sec + 'eta': 0, # seconds + 'filename': '' + } + + # Optional progress callback for UI updates + self.progress_callback = None def is_available(self) -> bool: """ @@ -139,30 +163,146 @@ class YouTubeClient: logger.error("yt-dlp is not installed") return False - def check_connection(self) -> bool: + async def check_connection(self) -> bool: """ - Test if YouTube is accessible by attempting a lightweight API call. + Test if YouTube is accessible by attempting a lightweight API call (async, Soulseek-compatible). Returns: bool: True if YouTube is reachable, False otherwise """ try: - ydl_opts = { - 'quiet': True, - 'no_warnings': True, - 'extract_flat': True, # Don't download, just extract info - } - - with yt_dlp.YoutubeDL(ydl_opts) as ydl: - # Try to extract info from a known video (YouTube's own channel trailer) - # This is a lightweight test that doesn't download anything - info = ydl.extract_info("https://www.youtube.com/watch?v=dQw4w9WgXcQ", download=False) - return info is not None + # Run in executor to avoid blocking event loop + loop = asyncio.get_event_loop() + + def _check(): + ydl_opts = { + 'quiet': True, + 'no_warnings': True, + 'extract_flat': True, # Don't download, just extract info + } + + with yt_dlp.YoutubeDL(ydl_opts) as ydl: + # Try to extract info from a known video (YouTube's own channel trailer) + # This is a lightweight test that doesn't download anything + info = ydl.extract_info("https://www.youtube.com/watch?v=dQw4w9WgXcQ", download=False) + return info is not None + + return await loop.run_in_executor(None, _check) except Exception as e: logger.error(f"YouTube connection check failed: {e}") return False + def is_configured(self) -> bool: + """ + Check if YouTube client is configured and ready to use (matches Soulseek interface). + + YouTube doesn't require authentication or configuration like Soulseek, + so this just checks if the client is available. + + Returns: + bool: True if YouTube client is ready to use + """ + return self.is_available() + + def set_progress_callback(self, callback): + """ + Set a callback function for progress updates. + Callback signature: callback(progress_dict) + + Progress dict contains: + - status: 'idle', 'downloading', 'postprocessing', 'completed', 'error' + - percent: 0.0-100.0 + - downloaded_bytes: int + - total_bytes: int + - speed: bytes/sec + - eta: estimated seconds remaining + - filename: current file being processed + """ + self.progress_callback = callback + + def _progress_hook(self, d): + """ + yt-dlp progress hook - called during download to report progress. + Updates the active_downloads dictionary for the current download. + Mirrors Soulseek's transfer status updates. + """ + try: + # Only update if we have a current download ID + if not self.current_download_id: + return + + status = d.get('status', 'unknown') + + if status == 'downloading': + downloaded = d.get('downloaded_bytes', 0) + total = d.get('total_bytes') or d.get('total_bytes_estimate', 0) + speed = d.get('speed', 0) or 0 + eta = d.get('eta', 0) or 0 + + if total > 0: + percent = (downloaded / total) * 100 + else: + percent = 0 + + # Update active downloads dictionary (thread-safe update) + if self.current_download_id in self.active_downloads: + download_info = self.active_downloads[self.current_download_id] + download_info['state'] = 'Downloading' + download_info['progress'] = round(percent, 1) + download_info['transferred'] = downloaded + download_info['size'] = total + download_info['speed'] = int(speed) + download_info['time_remaining'] = int(eta) if eta > 0 else None + + # Also update current_download_progress for legacy compatibility + self.current_download_progress = { + 'status': 'downloading', + 'percent': round(percent, 1), + 'downloaded_bytes': downloaded, + 'total_bytes': total, + 'speed': int(speed), + 'eta': int(eta), + 'filename': d.get('filename', '') + } + + # Call progress callback if set (for UI updates) + if self.progress_callback: + self.progress_callback(self.current_download_progress) + + elif status == 'finished': + # Update to postprocessing state + if self.current_download_id in self.active_downloads: + self.active_downloads[self.current_download_id]['state'] = 'Postprocessing' + self.active_downloads[self.current_download_id]['progress'] = 100.0 + + self.current_download_progress['status'] = 'postprocessing' + self.current_download_progress['percent'] = 100.0 + + if self.progress_callback: + self.progress_callback(self.current_download_progress) + + elif status == 'error': + # Mark as error + if self.current_download_id in self.active_downloads: + self.active_downloads[self.current_download_id]['state'] = 'Errored' + + self.current_download_progress['status'] = 'error' + if self.progress_callback: + self.progress_callback(self.current_download_progress) + + except Exception as e: + logger.debug(f"Progress hook error: {e}") + + def get_download_progress(self) -> dict: + """ + Get current download progress (mirrors Soulseek's get_download_status). + + Returns: + Dict with progress information (status, percent, speed, etc.) + """ + return self.current_download_progress.copy() + def _check_ffmpeg(self) -> bool: """Check if ffmpeg is available (system PATH or auto-download to tools folder)""" import shutil @@ -286,64 +426,146 @@ class YouTubeClient: logger.error(f" Mac: brew install ffmpeg") return False - def search(self, query: str, max_results: int = 10) -> List[YouTubeSearchResult]: + def _youtube_to_track_result(self, entry: dict, best_audio: Optional[dict] = None) -> TrackResult: """ - Search YouTube for tracks matching the query. + Convert YouTube video entry to TrackResult (Soulseek-compatible format). + This is the adapter layer that allows YouTube client to speak Soulseek's language. + + Args: + entry: YouTube video entry from yt-dlp + best_audio: Best audio format info (optional) + + Returns: + TrackResult object compatible with Soulseek interface + """ + # Parse artist and title from YouTube video title + title = entry.get('title', '') + artist = None + track_title = title + + # Common YouTube title patterns: "Artist - Title", "Artist: Title", etc. + patterns = [ + r'^(.+?)\s*[-–—]\s*(.+)$', # Artist - Title + r'^(.+?)\s*:\s*(.+)$', # Artist: Title + r'^(.+?)\s+by\s+(.+)$', # Title by Artist (reversed) + ] + + for pattern in patterns: + match = re.match(pattern, title, re.IGNORECASE) + if match: + if 'by' in pattern: + track_title = match.group(1).strip() + artist = match.group(2).strip() + else: + artist = match.group(1).strip() + track_title = match.group(2).strip() + break + + # Fallback: use uploader/channel as artist + if not artist: + artist = entry.get('uploader', entry.get('channel', 'Unknown Artist')) + + # Extract file size (estimate from format) + file_size = 0 + if best_audio and 'filesize' in best_audio: + file_size = best_audio.get('filesize', 0) or best_audio.get('filesize_approx', 0) or 0 + + # Extract bitrate + bitrate = None + if best_audio: + bitrate = int(best_audio.get('abr', best_audio.get('tbr', 0))) + + # Duration in milliseconds (Soulseek uses ms) + duration_ms = int(entry.get('duration', 0) * 1000) if entry.get('duration') else None + + # Quality string + quality_str = self._format_quality_string(best_audio) if best_audio else "unknown" + + # Video URL as filename (we'll use this to identify the track later) + video_id = entry.get('id', '') + filename = f"{video_id}||{title}" # Store video_id and title for later download + + return TrackResult( + username="youtube", # YouTube doesn't have users - use constant + filename=filename, + size=file_size, + bitrate=bitrate, + duration=duration_ms, + quality="mp3", # We always convert to MP3 + free_upload_slots=999, # YouTube always available + upload_speed=999999, # High speed indicator + queue_length=0, # No queue for YouTube + artist=artist, + title=track_title, + album=None, # YouTube videos don't have album info (will be added from Spotify) + track_number=None + ) + + async def search(self, query: str, timeout: int = None, progress_callback=None) -> tuple[List[TrackResult], List[AlbumResult]]: + """ + Search YouTube for tracks matching the query (async, Soulseek-compatible interface). Args: query: Search query (e.g., "Artist Name - Song Title") - max_results: Maximum number of results to return + timeout: Ignored for YouTube (kept for interface compatibility) + progress_callback: Optional callback for progress updates Returns: - List of YouTubeSearchResult objects + Tuple of (track_results, album_results). Album results will always be empty for YouTube. """ logger.info(f"🔍 Searching YouTube for: {query}") try: - ydl_opts = { - 'quiet': True, - 'no_warnings': True, - 'extract_flat': False, - 'default_search': 'ytsearch', - } - - with yt_dlp.YoutubeDL(ydl_opts) as ydl: - # Search YouTube - search_results = ydl.extract_info(f"ytsearch{max_results}:{query}", download=False) - - if not search_results or 'entries' not in search_results: - logger.warning(f"No YouTube results found for: {query}") - return [] - - results = [] - for entry in search_results['entries']: - if not entry: - continue - - # Get best audio format info - best_audio = self._get_best_audio_format(entry.get('formats', [])) - quality_str = self._format_quality_string(best_audio) - - result = YouTubeSearchResult( - video_id=entry.get('id', ''), - title=entry.get('title', ''), - channel=entry.get('uploader', entry.get('channel', '')), - duration=entry.get('duration', 0), - url=entry.get('webpage_url', f"https://www.youtube.com/watch?v={entry.get('id')}"), - thumbnail=entry.get('thumbnail', ''), - view_count=entry.get('view_count', 0), - upload_date=entry.get('upload_date', ''), - available_quality=quality_str, - best_audio_format=best_audio, - ) - results.append(result) - - logger.info(f"✅ Found {len(results)} YouTube results") - return results + # Run yt-dlp in executor to avoid blocking event loop + loop = asyncio.get_event_loop() + + def _search(): + ydl_opts = { + 'quiet': True, + 'no_warnings': True, + 'extract_flat': False, + 'default_search': 'ytsearch', + } + + with yt_dlp.YoutubeDL(ydl_opts) as ydl: + # Search YouTube (max 10 results) + search_results = ydl.extract_info(f"ytsearch10:{query}", download=False) + + if not search_results or 'entries' not in search_results: + return [] + + return search_results['entries'] + + # Run search in thread pool + entries = await loop.run_in_executor(None, _search) + + if not entries: + logger.warning(f"No YouTube results found for: {query}") + return ([], []) + + # Convert to TrackResult objects + track_results = [] + for entry in entries: + if not entry: + continue + + # Get best audio format info + best_audio = self._get_best_audio_format(entry.get('formats', [])) + + # Convert to TrackResult (Soulseek format) + track_result = self._youtube_to_track_result(entry, best_audio) + track_results.append(track_result) + + logger.info(f"✅ Found {len(track_results)} YouTube tracks") + + # Return tuple: (tracks, albums) - YouTube doesn't have albums, so return empty list + return (track_results, []) except Exception as e: logger.error(f"❌ YouTube search failed: {e}") - return [] + import traceback + traceback.print_exc() + return ([], []) def _get_best_audio_format(self, formats: List[Dict]) -> Optional[Dict]: """Extract best audio format from available formats""" @@ -490,139 +712,251 @@ class YouTubeClient: logger.info(f"✅ Found {len(matches)} matches above {min_confidence} confidence") return matches - def download(self, yt_result: YouTubeSearchResult, spotify_track: Optional[SpotifyTrack] = None) -> Optional[str]: + async def download(self, username: str, filename: str, file_size: int = 0) -> Optional[str]: """ - Download YouTube video as audio with proper metadata tagging (mirrors soulseek download). + Download YouTube video as audio (async, Soulseek-compatible interface). Args: - yt_result: YouTube result to download - spotify_track: Optional Spotify track for metadata embedding + username: Ignored for YouTube (always "youtube") + filename: Encoded as "video_id||title" from search results + file_size: Ignored for YouTube (kept for interface compatibility) Returns: - Path to downloaded file, or None if failed + download_id: Unique ID for tracking this download, or None if failed to start """ - logger.info(f"📥 Starting download: {yt_result.title}") - logger.info(f" Quality: {yt_result.available_quality}") - logger.info(f" URL: {yt_result.url}") + try: + # Parse filename to extract video_id + if '||' not in filename: + logger.error(f"❌ Invalid filename format: {filename}") + return None + + video_id, title = filename.split('||', 1) + youtube_url = f"https://www.youtube.com/watch?v={video_id}" + + logger.info(f"📥 Starting YouTube download: {title}") + logger.info(f" URL: {youtube_url}") + + # Create unique download ID + download_id = str(uuid.uuid4()) + + # Initialize download info in active downloads + async with self._download_lock: + self.active_downloads[download_id] = { + 'id': download_id, + 'filename': title, + 'username': 'youtube', + 'state': 'Initializing', + 'progress': 0.0, + 'size': file_size or 0, + 'transferred': 0, + 'speed': 0, + 'time_remaining': None, + 'video_id': video_id, + 'url': youtube_url, + 'title': title, + 'file_path': None, # Will be set when download completes + 'task': None # Will hold the background task + } + # Start download in background task + loop = asyncio.get_event_loop() + task = loop.create_task(self._download_internal(download_id, youtube_url, title)) + + # Store task reference + async with self._download_lock: + self.active_downloads[download_id]['task'] = task + + logger.info(f"✅ Download started with ID: {download_id}") + return download_id + + except Exception as e: + logger.error(f"❌ Failed to start download: {e}") + import traceback + traceback.print_exc() + return None + + async def _download_internal(self, download_id: str, youtube_url: str, title: str): + """ + Internal method to perform the actual YouTube download in the background. + + Args: + download_id: Unique download ID + youtube_url: YouTube video URL + title: Video title for display + """ try: - # Build download options - download_opts = self.download_opts.copy() - - # Get Spotify album details for proper folder structure and track numbering - track_number = 1 - disc_number = 1 - release_year = str(datetime.now().year) - album_artist = None - artist_genres = [] - - if spotify_track and spotify_track.id and not spotify_track.id.startswith('test'): - # Fetch full Spotify details to get track number, disc number, release date, genres + # Update state to downloading + async with self._download_lock: + if download_id in self.active_downloads: + self.active_downloads[download_id]['state'] = 'Downloading' + + # Set current download ID for progress tracking + self.current_download_id = download_id + + # Run yt-dlp download in thread pool (to avoid blocking event loop) + loop = asyncio.get_event_loop() + + def _download(): try: - from core.spotify_client import SpotifyClient - - spotify_client = SpotifyClient() - if spotify_client.is_authenticated(): - track_details = spotify_client.get_track_details(spotify_track.id) - if track_details: - track_number = track_details.get('track_number', 1) - disc_number = track_details.get('disc_number', 1) - - # Use album artist if available, otherwise use track artist - album_data = track_details.get('album', {}) - if album_data.get('artists'): - album_artist = album_data['artists'][0] - - # Get actual release year from Spotify - release_date = album_data.get('release_date', '') - if release_date: - release_year = release_date.split('-')[0] # Extract year from YYYY-MM-DD - - # Get artist genres (for metadata parity with Soulseek flow) - try: - primary_artist = track_details.get('primary_artist') - if primary_artist: - artist_info = spotify_client.get_artist(primary_artist) - if artist_info and hasattr(artist_info, 'genres'): - artist_genres = artist_info.genres - except: - pass - - logger.info(f" 📀 Spotify track #{track_number} on album: {spotify_track.album} ({release_year})") - except Exception as e: - logger.warning(f" ⚠️ Could not fetch Spotify track details: {e}") + # Use default download options + download_opts = self.download_opts.copy() - # If we have Spotify metadata, use production file organization - if spotify_track: - artist = spotify_track.artists[0] if spotify_track.artists else yt_result.parsed_artist - title = spotify_track.name - album = spotify_track.album + # Perform download + with yt_dlp.YoutubeDL(download_opts) as ydl: + info = ydl.extract_info(youtube_url, download=True) - # Use album artist if found, otherwise use track artist - if not album_artist: - album_artist = artist - - # Create folder structure: $albumartist/$albumartist - $album/ - album_folder = self.download_path / album_artist / f"{album_artist} - {album}" - album_folder.mkdir(parents=True, exist_ok=True) - - # File naming: $track - $title (production format) - final_filename = f"{track_number:02d} - {title}" - - # Sanitize filename (remove invalid characters) - final_filename = re.sub(r'[<>:"/\\|?*]', '', final_filename) - - # Override output template with production folder structure - download_opts['outtmpl'] = str(album_folder / f'{final_filename}.%(ext)s') - - logger.info(f" 📁 Album folder: {album_artist}/{album_artist} - {album}/") - logger.info(f" 📝 Filename: {final_filename}.mp3") - - # Add metadata postprocessor with Spotify info - download_opts['postprocessor_args'] = { - 'ffmpeg': [ - '-metadata', f'artist={artist}', - '-metadata', f'title={title}', - '-metadata', f'album={album}', - '-metadata', f'album_artist={album_artist}', - '-metadata', f'track={track_number}/{spotify_track.total_tracks if hasattr(spotify_track, "total_tracks") else track_number}', - '-metadata', f'disc={disc_number}', - '-metadata', f'date={release_year}', - '-metadata', 'comment=Downloaded via SoulSync (YouTube)', - ] - } + # Get final filename (will be MP3 after ffmpeg conversion) + filename = Path(ydl.prepare_filename(info)).with_suffix('.mp3') - # Perform download - with yt_dlp.YoutubeDL(download_opts) as ydl: - info = ydl.extract_info(yt_result.url, download=True) + if filename.exists(): + return str(filename) + else: + logger.error(f"❌ Download completed but file not found: {filename}") + return None - # Get final filename (will be MP3 after ffmpeg conversion) - filename = Path(ydl.prepare_filename(info)).with_suffix('.mp3') + except Exception as e: + logger.error(f"❌ Download failed in thread: {e}") + import traceback + traceback.print_exc() + return None - if filename.exists(): - logger.info(f"✅ Download successful: {filename}") + # Run download + file_path = await loop.run_in_executor(None, _download) - # Post-download: Enhance metadata with mutagen - album_art_url = self._enhance_metadata(str(filename), spotify_track, yt_result, track_number, disc_number, release_year, artist_genres) + if file_path: + # Mark download as completed + async with self._download_lock: + if download_id in self.active_downloads: + self.active_downloads[download_id]['state'] = 'Completed' + self.active_downloads[download_id]['progress'] = 100.0 + self.active_downloads[download_id]['file_path'] = file_path - # Save cover.jpg to album folder (like production) - if album_art_url and spotify_track: - self._save_cover_art(filename.parent, album_art_url) + logger.info(f"✅ Download {download_id} completed: {file_path}") + else: + # Mark as error + async with self._download_lock: + if download_id in self.active_downloads: + self.active_downloads[download_id]['state'] = 'Errored' - # Create .lrc lyrics file (like production) - if spotify_track: - self._create_lyrics_file(str(filename), spotify_track) + logger.error(f"❌ Download {download_id} failed") - return str(filename) - else: - logger.error(f"❌ Download completed but file not found: {filename}") - return None + except asyncio.CancelledError: + # Download was cancelled + async with self._download_lock: + if download_id in self.active_downloads: + self.active_downloads[download_id]['state'] = 'Cancelled' + + logger.info(f"⚠️ Download {download_id} cancelled") + raise except Exception as e: - logger.error(f"❌ Download failed: {e}") + # Download error + async with self._download_lock: + if download_id in self.active_downloads: + self.active_downloads[download_id]['state'] = 'Errored' + + logger.error(f"❌ Download {download_id} failed: {e}") import traceback traceback.print_exc() - return None + + finally: + # Clear current download ID + if self.current_download_id == download_id: + self.current_download_id = None + + async def get_all_downloads(self) -> List[DownloadStatus]: + """ + Get all active downloads (matches Soulseek interface). + + Returns: + List of DownloadStatus objects for all active downloads + """ + download_statuses = [] + + async with self._download_lock: + for download_id, download_info in self.active_downloads.items(): + status = DownloadStatus( + id=download_info['id'], + filename=download_info['filename'], + username=download_info['username'], + state=download_info['state'], + progress=download_info['progress'], + size=download_info['size'], + transferred=download_info['transferred'], + speed=download_info['speed'], + time_remaining=download_info.get('time_remaining') + ) + download_statuses.append(status) + + return download_statuses + + async def get_download_status(self, download_id: str) -> Optional[DownloadStatus]: + """ + Get status of a specific download (matches Soulseek interface). + + Args: + download_id: Download ID to query + + Returns: + DownloadStatus object or None if not found + """ + async with self._download_lock: + if download_id not in self.active_downloads: + return None + + download_info = self.active_downloads[download_id] + + return DownloadStatus( + id=download_info['id'], + filename=download_info['filename'], + username=download_info['username'], + state=download_info['state'], + progress=download_info['progress'], + size=download_info['size'], + transferred=download_info['transferred'], + speed=download_info['speed'], + time_remaining=download_info.get('time_remaining') + ) + + async def cancel_download(self, download_id: str, username: str = None, remove: bool = False) -> bool: + """ + Cancel an active download (matches Soulseek interface). + + Args: + download_id: Download ID to cancel + username: Ignored for YouTube (kept for interface compatibility) + remove: If True, remove from active downloads after cancelling + + Returns: + True if cancelled successfully, False otherwise + """ + try: + async with self._download_lock: + if download_id not in self.active_downloads: + logger.warning(f"⚠️ Download {download_id} not found") + return False + + download_info = self.active_downloads[download_id] + task = download_info.get('task') + + # Cancel the background task if it exists + if task and not task.done(): + task.cancel() + logger.info(f"⚠️ Cancelled download {download_id}") + + # Update state + download_info['state'] = 'Cancelled' + + # Remove from active downloads if requested + if remove: + del self.active_downloads[download_id] + logger.info(f"🗑️ Removed download {download_id} from queue") + + return True + + except Exception as e: + logger.error(f"❌ Failed to cancel download {download_id}: {e}") + return False def _enhance_metadata(self, filepath: str, spotify_track: Optional[SpotifyTrack], yt_result: YouTubeSearchResult, track_number: int = 1, disc_number: int = 1, release_year: str = None, artist_genres: list = None): """