""" YouTube Download Client Alternative music download source using yt-dlp and YouTube. This client provides: - YouTube search with metadata parsing - Production matching engine integration (same as Soulseek) - Full Spotify metadata enhancement - Automatic ffmpeg download and management - Album art and lyrics integration """ import sys import os import re import time import platform import asyncio import uuid import threading from typing import List, Optional, Dict, Any, Tuple from dataclasses import dataclass from pathlib import Path from datetime import datetime from enum import Enum try: import yt_dlp except ImportError: raise ImportError("yt-dlp is required. Install with: pip install yt-dlp") from utils.logging_config import get_logger from core.matching_engine import MusicMatchingEngine from core.spotify_client import Track as SpotifyTrack # Import Soulseek data structures for drop-in replacement compatibility from core.soulseek_client import SearchResult, TrackResult, AlbumResult, DownloadStatus logger = get_logger("youtube_client") @dataclass class YouTubeSearchResult: """YouTube search result with metadata parsing""" video_id: str title: str channel: str duration: int # seconds url: str thumbnail: str view_count: int upload_date: str # Parsed metadata parsed_artist: Optional[str] = None parsed_title: Optional[str] = None parsed_album: Optional[str] = None # Quality info available_quality: str = "unknown" best_audio_format: Optional[Dict] = None # Matching confidence confidence: float = 0.0 match_reason: str = "" def __post_init__(self): """Parse metadata from title""" self._parse_title_metadata() def _parse_title_metadata(self): """Extract artist and title from YouTube video title""" patterns = [ r'^(.+?)\s*[-–—]\s*(.+)$', # Artist - Title r'^(.+?)\s*:\s*(.+)$', # Artist: Title r'^(.+?)\s+by\s+(.+)$', # Title by Artist (reversed) ] for pattern in patterns: match = re.match(pattern, self.title, re.IGNORECASE) if match: if 'by' in pattern: self.parsed_title = match.group(1).strip() self.parsed_artist = match.group(2).strip() else: self.parsed_artist = match.group(1).strip() self.parsed_title = match.group(2).strip() return # Fallback: treat entire title as song title, channel as artist self.parsed_title = self.title self.parsed_artist = self.channel class YouTubeClient: """ YouTube download client using yt-dlp. Provides search, matching, and download capabilities with full Spotify metadata integration. """ def __init__(self, download_path: str = None): # Use Soulseek download path for consistency (post-processing expects files here) from config.settings import config_manager if download_path is None: download_path = config_manager.get('soulseek.download_path', './downloads') self.download_path = Path(download_path) self.download_path.mkdir(parents=True, exist_ok=True) logger.info(f"YouTube client using download path: {self.download_path}") # Callback for shutdown check (avoids circular imports) self.shutdown_check = None # Rate limiting — serialize YouTube downloads with delay self._download_semaphore = threading.Semaphore(1) self._download_delay = config_manager.get('youtube.download_delay', 3) self._last_download_time = 0 def set_shutdown_check(self, check_callable): """Set a callback function to check for system shutdown""" self.shutdown_check = check_callable # Initialize production matching engine for parity with Soulseek self.matching_engine = MusicMatchingEngine() logger.info("Initialized production MusicMatchingEngine") # Check for ffmpeg (REQUIRED for MP3 conversion) if not self._check_ffmpeg(): logger.error("ffmpeg is required but not found") logger.error("The client will attempt to auto-download ffmpeg on first use") # Download queue management (mirrors Soulseek's download tracking) # Maps download_id -> download_info dict self.active_downloads: Dict[str, Dict[str, Any]] = {} self._download_lock = threading.Lock() # Use threading.Lock for thread safety # Configure yt-dlp options with bot detection bypass self.download_opts = { 'format': 'bestaudio/best', 'outtmpl': str(self.download_path / '%(title)s.%(ext)s'), 'quiet': True, 'no_warnings': True, 'extract_flat': False, 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3', 'preferredquality': '320', }], 'progress_hooks': [self._progress_hook], # Track download progress 'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', 'age_limit': None, # Don't skip age-restricted } # Cookie support — use browser cookies for YouTube auth from config.settings import config_manager cookies_browser = config_manager.get('youtube.cookies_browser', '') if cookies_browser: self.download_opts['cookiesfrombrowser'] = (cookies_browser,) # Track current download progress (mirrors Soulseek transfer tracking) self.current_download_id: Optional[str] = None self.current_download_progress = { 'status': 'idle', # idle, downloading, postprocessing, completed, error 'percent': 0.0, 'downloaded_bytes': 0, 'total_bytes': 0, 'speed': 0, # bytes/sec 'eta': 0, # seconds 'filename': '' } # Optional progress callback for UI updates self.progress_callback = None def is_available(self) -> bool: """ Check if YouTube client is available (yt-dlp installed and ffmpeg available). Returns: bool: True if YouTube downloads can work, False otherwise """ try: # Check yt-dlp import yt_dlp # Check ffmpeg (will auto-download if needed) ffmpeg_ok = self._check_ffmpeg() return ffmpeg_ok except ImportError: logger.error("yt-dlp is not installed") return False def reload_settings(self): """Reload YouTube settings from config (called when settings are saved).""" from config.settings import config_manager self._download_delay = config_manager.get('youtube.download_delay', 3) cookies_browser = config_manager.get('youtube.cookies_browser', '') if cookies_browser: self.download_opts['cookiesfrombrowser'] = (cookies_browser,) elif 'cookiesfrombrowser' in self.download_opts: del self.download_opts['cookiesfrombrowser'] # Reload download path new_path = Path(config_manager.get('soulseek.download_path', './downloads')) if new_path != self.download_path: self.download_path = new_path self.download_path.mkdir(parents=True, exist_ok=True) self.download_opts['outtmpl'] = str(self.download_path / '%(title)s.%(ext)s') logger.info(f"YouTube download path updated to: {self.download_path}") logger.info(f"YouTube settings reloaded (delay={self._download_delay}s, cookies={'enabled' if cookies_browser else 'disabled'})") async def check_connection(self) -> bool: """ Test if YouTube is accessible by attempting a lightweight API call (async, Soulseek-compatible). Returns: bool: True if YouTube is reachable, False otherwise """ try: # Run in executor to avoid blocking event loop loop = asyncio.get_event_loop() def _check(): ydl_opts = { 'quiet': True, 'no_warnings': True, 'extract_flat': True, # Don't download, just extract info 'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', } with yt_dlp.YoutubeDL(ydl_opts) as ydl: # Try to extract info from a known video (YouTube's own channel trailer) # This is a lightweight test that doesn't download anything info = ydl.extract_info("https://www.youtube.com/watch?v=dQw4w9WgXcQ", download=False) return info is not None return await loop.run_in_executor(None, _check) except Exception as e: logger.error(f"YouTube connection check failed: {e}") return False def is_configured(self) -> bool: """ Check if YouTube client is configured and ready to use (matches Soulseek interface). YouTube doesn't require authentication or configuration like Soulseek, so this just checks if the client is available. Returns: bool: True if YouTube client is ready to use """ return self.is_available() def set_progress_callback(self, callback): """ Set a callback function for progress updates. Callback signature: callback(progress_dict) Progress dict contains: - status: 'idle', 'downloading', 'postprocessing', 'completed', 'error' - percent: 0.0-100.0 - downloaded_bytes: int - total_bytes: int - speed: bytes/sec - eta: estimated seconds remaining - filename: current file being processed """ self.progress_callback = callback def _progress_hook(self, d): """ yt-dlp progress hook - called during download to report progress. Updates the active_downloads dictionary for the current download. Mirrors Soulseek's transfer status updates. """ try: # Only update if we have a current download ID if not self.current_download_id: return status = d.get('status', 'unknown') if status == 'downloading': downloaded = d.get('downloaded_bytes', 0) total = d.get('total_bytes') or d.get('total_bytes_estimate', 0) speed = d.get('speed', 0) or 0 eta = d.get('eta', 0) or 0 if total > 0: percent = (downloaded / total) * 100 else: percent = 0 # Update active downloads dictionary (thread-safe update with lock) with self._download_lock: if self.current_download_id in self.active_downloads: download_info = self.active_downloads[self.current_download_id] download_info['state'] = 'InProgress, Downloading' # Match Soulseek state format download_info['progress'] = round(percent, 1) download_info['transferred'] = downloaded download_info['size'] = total download_info['speed'] = int(speed) download_info['time_remaining'] = int(eta) if eta > 0 else None # Also update current_download_progress for legacy compatibility self.current_download_progress = { 'status': 'downloading', 'percent': round(percent, 1), 'downloaded_bytes': downloaded, 'total_bytes': total, 'speed': int(speed), 'eta': int(eta), 'filename': d.get('filename', '') } # Call progress callback if set (for UI updates) if self.progress_callback: self.progress_callback(self.current_download_progress) elif status == 'finished': # Download finished, ffmpeg is converting to MP3 # Keep state as 'InProgress, Downloading' - the download thread will set final state with self._download_lock: if self.current_download_id in self.active_downloads: self.active_downloads[self.current_download_id]['progress'] = 95.0 # Almost done (converting) self.current_download_progress['status'] = 'postprocessing' self.current_download_progress['percent'] = 95.0 if self.progress_callback: self.progress_callback(self.current_download_progress) elif status == 'error': # Mark as error (thread-safe) with self._download_lock: if self.current_download_id in self.active_downloads: self.active_downloads[self.current_download_id]['state'] = 'Errored' self.current_download_progress['status'] = 'error' if self.progress_callback: self.progress_callback(self.current_download_progress) except Exception as e: logger.debug(f"Progress hook error: {e}") def get_download_progress(self) -> dict: """ Get current download progress (mirrors Soulseek's get_download_status). Returns: Dict with progress information (status, percent, speed, etc.) """ return self.current_download_progress.copy() def _check_ffmpeg(self) -> bool: """Check if ffmpeg is available (system PATH or auto-download to tools folder)""" import shutil import urllib.request import zipfile import tarfile # Check if ffmpeg is in system PATH if shutil.which('ffmpeg'): logger.info("Found ffmpeg in system PATH") return True # Auto-download ffmpeg to tools folder if not found tools_dir = Path(__file__).parent.parent / 'tools' tools_dir.mkdir(exist_ok=True) system = platform.system().lower() if system == 'windows': ffmpeg_path = tools_dir / 'ffmpeg.exe' ffprobe_path = tools_dir / 'ffprobe.exe' else: ffmpeg_path = tools_dir / 'ffmpeg' ffprobe_path = tools_dir / 'ffprobe' # If we already have both locally, use them if ffmpeg_path.exists() and ffprobe_path.exists(): logger.info(f"Found ffmpeg and ffprobe in tools folder") # Add to PATH so yt-dlp can find them tools_dir_str = str(tools_dir.absolute()) os.environ['PATH'] = tools_dir_str + os.pathsep + os.environ.get('PATH', '') return True # Auto-download ffmpeg binary logger.info(f"⬇️ ffmpeg not found - downloading for {system}...") try: if system == 'windows': # Download Windows ffmpeg (static build) url = 'https://github.com/BtbN/FFmpeg-Builds/releases/download/latest/ffmpeg-master-latest-win64-gpl.zip' zip_path = tools_dir / 'ffmpeg.zip' logger.info(f" Downloading from GitHub (this may take a minute)...") urllib.request.urlretrieve(url, zip_path) logger.info(f" Extracting ffmpeg.exe and ffprobe.exe...") with zipfile.ZipFile(zip_path, 'r') as zip_ref: # Extract ffmpeg.exe and ffprobe.exe from the bin folder for file in zip_ref.namelist(): if file.endswith('bin/ffmpeg.exe'): with zip_ref.open(file) as source, open(tools_dir / 'ffmpeg.exe', 'wb') as target: target.write(source.read()) elif file.endswith('bin/ffprobe.exe'): with zip_ref.open(file) as source, open(tools_dir / 'ffprobe.exe', 'wb') as target: target.write(source.read()) zip_path.unlink() # Clean up zip elif system == 'linux': # Download Linux ffmpeg (static build) url = 'https://github.com/BtbN/FFmpeg-Builds/releases/download/latest/ffmpeg-master-latest-linux64-gpl.tar.xz' tar_path = tools_dir / 'ffmpeg.tar.xz' logger.info(f" Downloading from GitHub (this may take a minute)...") urllib.request.urlretrieve(url, tar_path) logger.info(f" Extracting ffmpeg and ffprobe...") with tarfile.open(tar_path, 'r:xz') as tar_ref: for member in tar_ref.getmembers(): if member.name.endswith('bin/ffmpeg'): with tar_ref.extractfile(member) as source, open(tools_dir / 'ffmpeg', 'wb') as target: target.write(source.read()) (tools_dir / 'ffmpeg').chmod(0o755) # Make executable elif member.name.endswith('bin/ffprobe'): with tar_ref.extractfile(member) as source, open(tools_dir / 'ffprobe', 'wb') as target: target.write(source.read()) (tools_dir / 'ffprobe').chmod(0o755) # Make executable tar_path.unlink() # Clean up tar elif system == 'darwin': # Download Mac ffmpeg and ffprobe (static builds) logger.info(f" Downloading ffmpeg from evermeet.cx...") ffmpeg_url = 'https://evermeet.cx/ffmpeg/getrelease/zip' ffmpeg_zip = tools_dir / 'ffmpeg.zip' urllib.request.urlretrieve(ffmpeg_url, ffmpeg_zip) logger.info(f" Downloading ffprobe from evermeet.cx...") ffprobe_url = 'https://evermeet.cx/ffmpeg/getrelease/ffprobe/zip' ffprobe_zip = tools_dir / 'ffprobe.zip' urllib.request.urlretrieve(ffprobe_url, ffprobe_zip) logger.info(f" Extracting ffmpeg and ffprobe...") with zipfile.ZipFile(ffmpeg_zip, 'r') as zip_ref: zip_ref.extract('ffmpeg', tools_dir) with zipfile.ZipFile(ffprobe_zip, 'r') as zip_ref: zip_ref.extract('ffprobe', tools_dir) (tools_dir / 'ffmpeg').chmod(0o755) # Make executable (tools_dir / 'ffprobe').chmod(0o755) # Make executable ffmpeg_zip.unlink() # Clean up zip ffprobe_zip.unlink() # Clean up zip else: logger.error(f"Unsupported platform: {system}") return False logger.info(f"Downloaded ffmpeg to: {ffmpeg_path}") # Add to PATH tools_dir_str = str(tools_dir.absolute()) os.environ['PATH'] = tools_dir_str + os.pathsep + os.environ.get('PATH', '') return True except Exception as e: logger.error(f"Failed to download ffmpeg: {e}") logger.error(f" Please install manually:") logger.error(f" Windows: scoop install ffmpeg") logger.error(f" Linux: sudo apt install ffmpeg") logger.error(f" Mac: brew install ffmpeg") return False def _youtube_to_track_result(self, entry: dict, best_audio: Optional[dict] = None) -> TrackResult: """ Convert YouTube video entry to TrackResult (Soulseek-compatible format). This is the adapter layer that allows YouTube client to speak Soulseek's language. Args: entry: YouTube video entry from yt-dlp best_audio: Best audio format info (optional) Returns: TrackResult object compatible with Soulseek interface """ # Parse artist and title from YouTube video title title = entry.get('title', '') artist = None track_title = title # Common YouTube title patterns: "Artist - Title", "Artist: Title", etc. patterns = [ r'^(.+?)\s*[-–—]\s*(.+)$', # Artist - Title r'^(.+?)\s*:\s*(.+)$', # Artist: Title r'^(.+?)\s+by\s+(.+)$', # Title by Artist (reversed) ] for pattern in patterns: match = re.match(pattern, title, re.IGNORECASE) if match: if 'by' in pattern: track_title = match.group(1).strip() artist = match.group(2).strip() else: artist = match.group(1).strip() track_title = match.group(2).strip() break # Fallback: use uploader/channel as artist if not artist: artist = entry.get('uploader', entry.get('channel', 'Unknown Artist')) # Strip YouTube auto-generated "- Topic" suffix from channel names if artist and re.search(r'\s*-\s*Topic\s*$', artist, re.IGNORECASE): artist = re.sub(r'\s*-\s*Topic\s*$', '', artist, flags=re.IGNORECASE).strip() # Extract file size (estimate from format) file_size = 0 if best_audio and 'filesize' in best_audio: file_size = best_audio.get('filesize', 0) or best_audio.get('filesize_approx', 0) or 0 # Extract bitrate bitrate = None if best_audio: bitrate = int(best_audio.get('abr', best_audio.get('tbr', 0))) # Duration in milliseconds (Soulseek uses ms) duration_ms = int(entry.get('duration', 0) * 1000) if entry.get('duration') else None # Quality string quality_str = self._format_quality_string(best_audio) if best_audio else "unknown" # Video URL as filename (we'll use this to identify the track later) video_id = entry.get('id', '') filename = f"{video_id}||{title}" # Store video_id and title for later download track_result = TrackResult( username="youtube", # YouTube doesn't have users - use constant filename=filename, size=file_size, bitrate=bitrate, duration=duration_ms, quality="mp3", # We always convert to MP3 free_upload_slots=999, # YouTube always available upload_speed=999999, # High speed indicator queue_length=0, # No queue for YouTube artist=artist, title=track_title, album=None, # YouTube videos don't have album info (will be added from Spotify) track_number=None ) # Add thumbnail for frontend (surgical addition) # In fast mode (extract_flat), 'thumbnail' might be missing, but 'thumbnails' list exists thumbnail = entry.get('thumbnail') if not thumbnail and entry.get('thumbnails'): # Pick the last thumbnail (usually highest quality) thumbs = entry.get('thumbnails') if isinstance(thumbs, list) and thumbs: thumbnail = thumbs[-1].get('url') track_result.thumbnail = thumbnail return track_result async def search_videos(self, query: str, max_results: int = 20) -> List[YouTubeSearchResult]: """Search YouTube and return video metadata for music video display. Unlike search() which returns TrackResult objects for download matching, this returns YouTubeSearchResult objects with video-specific metadata (thumbnails, view counts, channel names) for UI display. """ logger.info(f"Searching YouTube videos for: {query}") try: loop = asyncio.get_event_loop() def _search(): from config.settings import config_manager ydl_opts = { 'quiet': True, 'no_warnings': True, 'extract_flat': True, 'default_search': 'ytsearch', 'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', } cookies_browser = config_manager.get('youtube.cookies_browser', '') if cookies_browser: ydl_opts['cookiesfrombrowser'] = (cookies_browser,) with yt_dlp.YoutubeDL(ydl_opts) as ydl: data = ydl.extract_info(f"ytsearch{max_results}:{query}", download=False) if not data or 'entries' not in data: return [] results = [] for entry in data['entries']: if not entry: continue video_id = entry.get('id', '') title = entry.get('title', '') if not video_id or not title: continue # Skip very short clips (< 30s) and very long content (> 15min) duration = entry.get('duration') or 0 if duration < 30 or duration > 900: continue channel = entry.get('uploader', entry.get('channel', '')) if channel and re.search(r'\s*-\s*Topic\s*$', channel, re.IGNORECASE): channel = re.sub(r'\s*-\s*Topic\s*$', '', channel, flags=re.IGNORECASE).strip() thumbnail = entry.get('thumbnail') if not thumbnail and entry.get('thumbnails'): thumbs = entry['thumbnails'] if isinstance(thumbs, list) and thumbs: thumbnail = thumbs[-1].get('url') results.append(YouTubeSearchResult( video_id=video_id, title=title, channel=channel, duration=duration, url=f"https://www.youtube.com/watch?v={video_id}", thumbnail=thumbnail or '', view_count=entry.get('view_count', 0) or 0, upload_date=entry.get('upload_date', ''), )) return results return await loop.run_in_executor(None, _search) except Exception as e: logger.error(f"YouTube video search failed: {e}") return [] async def search(self, query: str, timeout: int = None, progress_callback=None) -> tuple[List[TrackResult], List[AlbumResult]]: """ Search YouTube for tracks matching the query (async, Soulseek-compatible interface). Args: query: Search query (e.g., "Artist Name - Song Title") timeout: Ignored for YouTube (kept for interface compatibility) progress_callback: Optional callback for progress updates Returns: Tuple of (track_results, album_results). Album results will always be empty for YouTube. """ logger.info(f"Searching YouTube for: {query}") try: # Run yt-dlp in executor to avoid blocking event loop loop = asyncio.get_event_loop() def _search(): from config.settings import config_manager ydl_opts = { 'quiet': True, 'no_warnings': True, 'extract_flat': True, # Fast mode: Don't fetch formats (massive speedup) 'default_search': 'ytsearch', 'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', } # Add cookie support for search (avoids bot detection) cookies_browser = config_manager.get('youtube.cookies_browser', '') if cookies_browser: ydl_opts['cookiesfrombrowser'] = (cookies_browser,) with yt_dlp.YoutubeDL(ydl_opts) as ydl: # Search YouTube (max 50 results) search_results = ydl.extract_info(f"ytsearch50:{query}", download=False) if not search_results or 'entries' not in search_results: return [] return search_results['entries'] # Run search in thread pool entries = await loop.run_in_executor(None, _search) if not entries: logger.warning(f"No YouTube results found for: {query}") return ([], []) # Convert to TrackResult objects track_results = [] for entry in entries: if not entry: continue # Get best audio format info best_audio = self._get_best_audio_format(entry.get('formats', [])) # Convert to TrackResult (Soulseek format) track_result = self._youtube_to_track_result(entry, best_audio) track_results.append(track_result) logger.info(f"Found {len(track_results)} YouTube tracks") # Return tuple: (tracks, albums) - YouTube doesn't have albums, so return empty list return (track_results, []) except Exception as e: logger.error(f"YouTube search failed: {e}") import traceback traceback.print_exc() return ([], []) def _get_best_audio_format(self, formats: List[Dict]) -> Optional[Dict]: """Extract best audio format from available formats""" if not formats: return None # Filter for audio-only formats audio_formats = [f for f in formats if f.get('vcodec') == 'none' and f.get('acodec') != 'none'] if not audio_formats: return None # Sort by audio bitrate (tbr = total bitrate, abr = audio bitrate) audio_formats.sort(key=lambda f: f.get('abr', f.get('tbr', 0)), reverse=True) return audio_formats[0] def _format_quality_string(self, audio_format: Optional[Dict]) -> str: """Format quality info string""" if not audio_format: return "unknown" abr = audio_format.get('abr', audio_format.get('tbr', 0)) acodec = audio_format.get('acodec', 'unknown') if abr: return f"{int(abr)}kbps {acodec.upper()}" return acodec.upper() def calculate_match_confidence(self, spotify_track: SpotifyTrack, yt_result: YouTubeSearchResult) -> Tuple[float, str]: """ Calculate match confidence using PRODUCTION matching engine for parity with Soulseek. Returns: (confidence_score, match_reason) tuple """ # Use production matching engine's normalization and similarity scoring spotify_artist = spotify_track.artists[0] if spotify_track.artists else "" yt_artist = yt_result.parsed_artist or yt_result.channel # Normalize using production engine spotify_artist_clean = self.matching_engine.clean_artist(spotify_artist) yt_artist_clean = self.matching_engine.clean_artist(yt_artist) spotify_title_clean = self.matching_engine.clean_title(spotify_track.name) yt_title_clean = self.matching_engine.clean_title(yt_result.parsed_title) # Use production similarity_score (includes version detection, remaster penalties, etc.) artist_similarity = self.matching_engine.similarity_score(spotify_artist_clean, yt_artist_clean) title_similarity = self.matching_engine.similarity_score(spotify_title_clean, yt_title_clean) # Duration matching using production engine spotify_duration_ms = spotify_track.duration_ms yt_duration_ms = int(yt_result.duration * 1000) # Convert seconds to ms duration_similarity = self.matching_engine.duration_similarity(spotify_duration_ms, yt_duration_ms) # Quality penalty (YouTube-specific) quality_score = self._quality_score(yt_result.available_quality) # Weighted confidence calculation (similar to production Soulseek matching) # Production uses: title * 0.5 + artist * 0.3 + duration * 0.2 # Adjusted for YouTube: title * 0.4 + artist * 0.3 + duration * 0.2 + quality * 0.1 confidence = ( title_similarity * 0.40 + artist_similarity * 0.30 + duration_similarity * 0.20 + quality_score * 0.10 ) # Determine match reason if confidence >= 0.8: reason = "excellent_match" elif confidence >= 0.65: reason = "good_match" elif confidence >= 0.58: # Match production threshold reason = "acceptable_match" else: reason = "poor_match" # Bonus for official channels/verified if 'vevo' in yt_artist.lower() or 'official' in yt_result.channel.lower(): confidence = min(1.0, confidence + 0.05) reason += "_official" logger.debug(f"Match confidence: {confidence:.2f} | Artist: {artist_similarity:.2f} | Title: {title_similarity:.2f} | Duration: {duration_similarity:.2f} | Quality: {quality_score:.2f}") return confidence, reason def _quality_score(self, quality_str: str) -> float: """Score quality string (mirrors quality_score logic)""" quality_lower = quality_str.lower() # Extract bitrate bitrate_match = re.search(r'(\d+)kbps', quality_lower) if bitrate_match: bitrate = int(bitrate_match.group(1)) # Scoring based on bitrate if bitrate >= 256: return 1.0 elif bitrate >= 192: return 0.8 elif bitrate >= 128: return 0.6 else: return 0.4 # Codec-based scoring if no bitrate if 'opus' in quality_lower: return 0.9 elif 'aac' in quality_lower: return 0.7 elif 'mp3' in quality_lower: return 0.7 return 0.5 # Unknown quality def find_best_matches(self, spotify_track: SpotifyTrack, yt_results: List[YouTubeSearchResult], min_confidence: float = 0.58) -> List[YouTubeSearchResult]: """ Find best YouTube matches for Spotify track (mirrors find_best_slskd_matches). Uses production threshold of 0.58 for parity with Soulseek matching. Args: spotify_track: Spotify track to match yt_results: YouTube search results min_confidence: Minimum confidence threshold (default: 0.58, same as production) Returns: Sorted list of matches above confidence threshold """ matches = [] for yt_result in yt_results: confidence, reason = self.calculate_match_confidence(spotify_track, yt_result) yt_result.confidence = confidence yt_result.match_reason = reason if confidence >= min_confidence: matches.append(yt_result) # Sort by confidence (best first) matches.sort(key=lambda r: r.confidence, reverse=True) logger.info(f"Found {len(matches)} matches above {min_confidence} confidence") return matches async def download(self, username: str, filename: str, file_size: int = 0) -> Optional[str]: """ Download YouTube video as audio (async, Soulseek-compatible interface). Returns download_id immediately and runs download in background thread. Monitor via get_download_status() or get_all_downloads(). Args: username: Ignored for YouTube (always "youtube") filename: Encoded as "video_id||title" from search results file_size: Ignored for YouTube (kept for interface compatibility) Returns: download_id: Unique ID for tracking this download """ try: # Parse filename to extract video_id if '||' not in filename: logger.error(f"Invalid filename format: {filename}") return None video_id, title = filename.split('||', 1) youtube_url = f"https://www.youtube.com/watch?v={video_id}" logger.info(f"Starting YouTube download: {title}") logger.info(f" URL: {youtube_url}") # Create unique download ID download_id = str(uuid.uuid4()) # Initialize download info in active downloads with self._download_lock: self.active_downloads[download_id] = { 'id': download_id, 'filename': filename, # Keep original encoded format for context matching! 'username': 'youtube', 'state': 'Initializing', # Soulseek-style states 'progress': 0.0, 'size': file_size or 0, 'transferred': 0, 'speed': 0, 'time_remaining': None, 'video_id': video_id, 'url': youtube_url, 'title': title, 'file_path': None, # Will be set when download completes } # Start download in background thread (returns immediately) download_thread = threading.Thread( target=self._download_thread_worker, args=(download_id, youtube_url, title, filename), daemon=True ) download_thread.start() logger.info(f"YouTube download {download_id} started in background") return download_id except Exception as e: logger.error(f"Failed to start YouTube download: {e}") import traceback traceback.print_exc() return None def _download_thread_worker(self, download_id: str, youtube_url: str, title: str, original_filename: str): """ Background thread worker for downloading YouTube videos. Updates active_downloads dict with progress. Serialized via semaphore with configurable delay between downloads. """ try: with self._download_semaphore: # Enforce delay since last download completed elapsed = time.time() - self._last_download_time if self._last_download_time > 0 and elapsed < self._download_delay: wait_time = self._download_delay - elapsed logger.info(f"Rate limiting: waiting {wait_time:.1f}s before next YouTube download") time.sleep(wait_time) # Update state to downloading with self._download_lock: if download_id in self.active_downloads: self.active_downloads[download_id]['state'] = 'InProgress, Downloading' # Match Soulseek state # Set current download ID for progress hook self.current_download_id = download_id # Perform actual download file_path = self._download_sync(youtube_url, title) # Clear current download ID self.current_download_id = None # Record completion time for rate limiting self._last_download_time = time.time() if file_path: # Mark as completed/succeeded (match Soulseek state) with self._download_lock: if download_id in self.active_downloads: # IMPORTANT: Keep original filename for context lookup! # The filename must match what was used to create the context entry # We store the actual file path separately self.active_downloads[download_id]['state'] = 'Completed, Succeeded' # Match Soulseek self.active_downloads[download_id]['progress'] = 100.0 self.active_downloads[download_id]['file_path'] = file_path # DO NOT update filename - keep original_filename for context matching logger.info(f"YouTube download {download_id} completed: {file_path}") else: # Mark as errored with self._download_lock: if download_id in self.active_downloads: self.active_downloads[download_id]['state'] = 'Errored' logger.error(f"YouTube download {download_id} failed") except Exception as e: logger.error(f"YouTube download thread failed for {download_id}: {e}") import traceback traceback.print_exc() # Mark as errored with self._download_lock: if download_id in self.active_downloads: self.active_downloads[download_id]['state'] = 'Errored' # Clear current download ID if self.current_download_id == download_id: self.current_download_id = None def _download_sync(self, youtube_url: str, title: str) -> Optional[str]: """ Synchronous download method (runs in thread pool executor). Args: youtube_url: YouTube video URL title: Video title for display Returns: File path if successful, None otherwise """ try: max_retries = 3 for attempt in range(max_retries): # Check for server shutdown using callback if self.shutdown_check and self.shutdown_check(): logger.info(f"Server shutting down, aborting download attempt {attempt + 1}") return None try: # Use default download options download_opts = self.download_opts.copy() # Force best audio format to prevent 'Requested format not available' errors download_opts['format'] = 'bestaudio/best' download_opts['noplaylist'] = True # On retry, try different strategies if attempt == 1: # Drop browser cookies — authenticated sessions sometimes get restricted formats if 'cookiesfrombrowser' in download_opts: logger.info(f"Retry {attempt + 1}/{max_retries} without browser cookies") download_opts.pop('cookiesfrombrowser', None) else: logger.info(f"Retry {attempt + 1}/{max_retries} with web_creator client") download_opts['extractor_args'] = { 'youtube': { 'player_client': ['web_creator'] } } elif attempt >= 2: logger.info(f"Retry {attempt + 1}/{max_retries} with 'best' format (video fallback)") download_opts['format'] = 'best' download_opts.pop('cookiesfrombrowser', None) download_opts.pop('extractor_args', None) # Perform download with yt_dlp.YoutubeDL(download_opts) as ydl: info = ydl.extract_info(youtube_url, download=True) # Get final filename (will be MP3 after ffmpeg conversion) filename = Path(ydl.prepare_filename(info)).with_suffix('.mp3') if filename.exists(): return str(filename) else: logger.error(f"Download completed but file not found: {filename}") if attempt < max_retries - 1: continue # Retry return None except Exception as e: error_msg = str(e) logger.error(f"Download attempt {attempt + 1} failed: {error_msg}") # Check if it's a 403 error if '403' in error_msg or 'Forbidden' in error_msg: if attempt < max_retries - 1: logger.info(f"Waiting 2 seconds before retry...") import time time.sleep(2) continue # Retry on 403 # For other errors or last retry, print traceback and return if attempt == max_retries - 1: import traceback traceback.print_exc() else: continue # Retry return None return None # All retries failed except Exception as e: logger.error(f"Download failed: {e}") import traceback traceback.print_exc() return None def download_music_video(self, video_url: str, output_path: str, progress_callback=None) -> Optional[str]: """Download a YouTube video as a music video file (keeps video, not audio-only). Args: video_url: YouTube video URL output_path: Full path for the output file (without extension — yt-dlp adds it) progress_callback: Optional callback(percent: float) for progress updates Returns: Final file path if successful, None otherwise """ try: from config.settings import config_manager def _progress_hook(d): if progress_callback and d.get('status') == 'downloading': total = d.get('total_bytes') or d.get('total_bytes_estimate') or 0 downloaded = d.get('downloaded_bytes', 0) if total > 0: progress_callback(downloaded / total * 100) download_opts = { 'quiet': True, 'no_warnings': True, 'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best', 'merge_output_format': 'mp4', 'outtmpl': output_path + '.%(ext)s', 'noplaylist': True, 'progress_hooks': [_progress_hook], 'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', } cookies_browser = config_manager.get('youtube.cookies_browser', '') if cookies_browser: download_opts['cookiesfrombrowser'] = (cookies_browser,) with yt_dlp.YoutubeDL(download_opts) as ydl: info = ydl.extract_info(video_url, download=True) final_path = Path(ydl.prepare_filename(info)) # yt-dlp may have merged to mp4 mp4_path = final_path.with_suffix('.mp4') if mp4_path.exists(): return str(mp4_path) if final_path.exists(): return str(final_path) # Check for any file matching the stem for f in final_path.parent.glob(f"{final_path.stem}.*"): if f.suffix in ('.mp4', '.mkv', '.webm'): return str(f) logger.error(f"Music video download completed but file not found: {final_path}") return None except Exception as e: logger.error(f"Music video download failed: {e}") import traceback traceback.print_exc() return None async def get_all_downloads(self) -> List[DownloadStatus]: """ Get all active downloads (matches Soulseek interface). Returns: List of DownloadStatus objects for all active downloads """ download_statuses = [] with self._download_lock: for download_id, download_info in self.active_downloads.items(): status = DownloadStatus( id=download_info['id'], filename=download_info['filename'], username=download_info['username'], state=download_info['state'], progress=download_info['progress'], size=download_info['size'], transferred=download_info['transferred'], speed=download_info['speed'], time_remaining=download_info.get('time_remaining') ) download_statuses.append(status) return download_statuses async def get_download_status(self, download_id: str) -> Optional[DownloadStatus]: """ Get status of a specific download (matches Soulseek interface). Args: download_id: Download ID to query Returns: DownloadStatus object or None if not found """ with self._download_lock: if download_id not in self.active_downloads: return None download_info = self.active_downloads[download_id] return DownloadStatus( id=download_info['id'], filename=download_info['filename'], username=download_info['username'], state=download_info['state'], progress=download_info['progress'], size=download_info['size'], transferred=download_info['transferred'], speed=download_info['speed'], time_remaining=download_info.get('time_remaining'), file_path=download_info.get('file_path') ) async def clear_all_completed_downloads(self) -> bool: """ Clear all terminal (completed, cancelled, errored) downloads from the list. Matches Soulseek interface. """ try: with self._download_lock: # Identify IDs to remove ids_to_remove = [] for download_id, info in self.active_downloads.items(): state = info.get('state', '') # Check for terminal states # Note: We check exact strings used in _download_thread_worker and cancel_download if state in ['Completed, Succeeded', 'Cancelled', 'Errored', 'Aborted']: ids_to_remove.append(download_id) # Remove them for download_id in ids_to_remove: del self.active_downloads[download_id] logger.debug(f"Cleared finished download {download_id}") return True except Exception as e: logger.error(f"Error clearing downloads: {e}") return False async def cancel_download(self, download_id: str, username: str = None, remove: bool = False) -> bool: """ Cancel an active download (matches Soulseek interface). NOTE: YouTube downloads cannot be truly cancelled mid-download, but we mark them as cancelled for UI consistency. Args: download_id: Download ID to cancel username: Ignored for YouTube (kept for interface compatibility) remove: If True, remove from active downloads after cancelling Returns: True if cancelled successfully, False otherwise """ try: with self._download_lock: if download_id not in self.active_downloads: logger.warning(f"Download {download_id} not found") return False # Update state to cancelled self.active_downloads[download_id]['state'] = 'Cancelled' logger.info(f"Marked YouTube download {download_id} as cancelled") # Remove from active downloads if requested if remove: del self.active_downloads[download_id] logger.info(f"Removed YouTube download {download_id} from queue") return True except Exception as e: logger.error(f"Failed to cancel download {download_id}: {e}") return False def _enhance_metadata(self, filepath: str, spotify_track: Optional[SpotifyTrack], yt_result: YouTubeSearchResult, track_number: int = 1, disc_number: int = 1, release_year: str = None, artist_genres: list = None): """ Enhance MP3 metadata using mutagen + Spotify album art (mirrors main app's metadata enhancement). Uses full Spotify metadata including disc number, actual release year, and genre tags. """ try: from mutagen.mp3 import MP3 from mutagen.id3 import ID3, TIT2, TPE1, TALB, TDRC, COMM, APIC, TRCK, TPE2, TPOS, TCON from mutagen.id3 import ID3NoHeaderError import requests logger.info(f"Enhancing metadata for: {Path(filepath).name}") # Load MP3 file audio = MP3(filepath) # Clear ALL existing tags and start fresh if audio.tags is not None: # Delete ALL existing frames audio.tags.clear() logger.debug(f" Cleared all existing tag frames") else: # No tags exist, add them audio.add_tags() logger.debug(f" Added new tag structure") if spotify_track: # Use Spotify metadata artist = spotify_track.artists[0] if spotify_track.artists else "Unknown Artist" title = spotify_track.name album = spotify_track.album year = release_year or str(datetime.now().year) # Get album artist from Spotify (already fetched in download() but re-fetch for safety) album_artist = artist try: if spotify_track.id and not spotify_track.id.startswith('test'): from core.spotify_client import SpotifyClient spotify_client = SpotifyClient() if spotify_client.is_authenticated(): track_details = spotify_client.get_track_details(spotify_track.id) if track_details: album_data = track_details.get('album', {}) if album_data.get('artists'): album_artist = album_data['artists'][0] except: pass logger.debug(f" Setting metadata tags...") # Set ID3 tags (using setall to ensure they're set) audio.tags.setall('TIT2', [TIT2(encoding=3, text=title)]) audio.tags.setall('TPE1', [TPE1(encoding=3, text=artist)]) audio.tags.setall('TPE2', [TPE2(encoding=3, text=album_artist)]) # Album artist audio.tags.setall('TALB', [TALB(encoding=3, text=album)]) audio.tags.setall('TRCK', [TRCK(encoding=3, text=str(track_number))]) # Track number audio.tags.setall('TPOS', [TPOS(encoding=3, text=str(disc_number))]) # Disc number audio.tags.setall('TDRC', [TDRC(encoding=3, text=year)]) # Genre (from Spotify artist data - matches production flow) if artist_genres: if len(artist_genres) == 1: genre = artist_genres[0] else: # Combine up to 3 genres (matches production logic) genre = ', '.join(artist_genres[:3]) audio.tags.setall('TCON', [TCON(encoding=3, text=genre)]) logger.debug(f" Genre: {genre}") audio.tags.setall('COMM', [COMM(encoding=3, lang='eng', desc='', text=f'Downloaded via SoulSync (YouTube)\nSource: {yt_result.url}\nConfidence: {yt_result.confidence:.2f}')]) logger.debug(f" Artist: {artist}") logger.debug(f" Album Artist: {album_artist}") logger.debug(f" Title: {title}") logger.debug(f" Album: {album}") logger.debug(f" Track #: {track_number}") logger.debug(f" Disc #: {disc_number}") logger.debug(f" Year: {year}") # Fetch and embed album art from Spotify (via search) logger.debug(f" Fetching album art from Spotify...") album_art_url = self._get_spotify_album_art(spotify_track) if album_art_url: try: # Download album art response = requests.get(album_art_url, timeout=10) response.raise_for_status() # Determine image type if 'jpeg' in response.headers.get('Content-Type', ''): mime_type = 'image/jpeg' elif 'png' in response.headers.get('Content-Type', ''): mime_type = 'image/png' else: mime_type = 'image/jpeg' # Default # Embed album art audio.tags.add(APIC( encoding=3, mime=mime_type, type=3, # Cover (front) desc='Cover', data=response.content )) logger.debug(f" Album art embedded ({len(response.content) // 1024} KB)") except Exception as art_error: logger.warning(f" Could not embed album art: {art_error}") else: logger.warning(f" No album art found on Spotify") # Save all tags audio.save() logger.info(f"Metadata enhanced successfully") # Return album art URL for cover.jpg creation return album_art_url except ImportError: logger.warning("mutagen not installed - skipping enhanced metadata tagging") logger.warning(" Install with: pip install mutagen") return None except Exception as e: logger.warning(f"Could not enhance metadata: {e}") return None def _get_spotify_album_art(self, spotify_track: SpotifyTrack) -> Optional[str]: """Get album art URL from Spotify API""" try: from core.spotify_client import SpotifyClient spotify_client = SpotifyClient() if not spotify_client.is_authenticated(): return None # Search for the album to get album art albums = spotify_client.search_albums(f"{spotify_track.artists[0]} {spotify_track.album}", limit=1) if albums and len(albums) > 0: album = albums[0] if hasattr(album, 'image_url') and album.image_url: return album.image_url return None except Exception as e: logger.warning(f"Could not fetch Spotify album art: {e}") return None def _save_cover_art(self, album_folder: Path, album_art_url: str): """Save cover.jpg to album folder (mirrors production behavior)""" import requests try: cover_path = album_folder / "cover.jpg" # Don't overwrite existing cover art if cover_path.exists(): logger.debug(f" ℹ️ cover.jpg already exists, skipping") return logger.debug(f" Downloading cover.jpg...") response = requests.get(album_art_url, timeout=10) response.raise_for_status() # Save to file cover_path.write_bytes(response.content) logger.debug(f" Saved cover.jpg ({len(response.content) // 1024} KB)") except Exception as e: logger.warning(f" Could not save cover.jpg: {e}") def _create_lyrics_file(self, audio_file_path: str, spotify_track: SpotifyTrack): """ Create .lrc lyrics file using LRClib API (mirrors production lyrics flow). """ try: # Import lyrics client from core.lyrics_client import lyrics_client if not lyrics_client.api: logger.debug(f" LRClib API not available - skipping lyrics") return logger.debug(f" Fetching lyrics from LRClib...") # Get track metadata artist_name = spotify_track.artists[0] if spotify_track.artists else "Unknown Artist" track_name = spotify_track.name album_name = spotify_track.album duration_seconds = int(spotify_track.duration_ms / 1000) if spotify_track.duration_ms else None # Create LRC file success = lyrics_client.create_lrc_file( audio_file_path=audio_file_path, track_name=track_name, artist_name=artist_name, album_name=album_name, duration_seconds=duration_seconds ) if success: logger.debug(f" Created .lrc lyrics file") else: logger.debug(f" No lyrics found on LRClib") except ImportError: logger.debug(f" lyrics_client not available - skipping lyrics") except Exception as e: logger.warning(f" Could not create lyrics file: {e}") def search_and_download_best(self, spotify_track: SpotifyTrack, min_confidence: float = 0.58) -> Optional[str]: """ Complete flow: search, find best match, download (mirrors soulseek flow). Uses production threshold of 0.58 for parity with Soulseek matching. Args: spotify_track: Spotify track to download min_confidence: Minimum confidence threshold (default: 0.58, same as production) Returns: Path to downloaded file, or None if failed """ logger.info(f"Starting YouTube download flow for: {spotify_track.name} by {spotify_track.artists[0]}") # Generate search query query = f"{spotify_track.artists[0]} {spotify_track.name}" # Search YouTube results = self.search(query, max_results=10) if not results: logger.error(f"No YouTube results found for query: {query}") return None # Find best matches matches = self.find_best_matches(spotify_track, results, min_confidence=min_confidence) if not matches: logger.error(f"No matches above {min_confidence} confidence threshold") return None # Try downloading best match best_match = matches[0] logger.info(f"Best match: {best_match.title} (confidence: {best_match.confidence:.2f})") downloaded_file = self.download(best_match, spotify_track) return downloaded_file