mirror of https://github.com/Nezreka/SoulSync.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
925 lines
36 KiB
925 lines
36 KiB
"""
|
|
Tidal Download Client
|
|
Alternative music download source using tidalapi.
|
|
|
|
This client provides:
|
|
- Tidal search with metadata
|
|
- Device flow authentication (link.tidal.com)
|
|
- HiRes/Lossless/High quality audio downloads via Tidal v2 trackManifests endpoint
|
|
- Drop-in replacement compatible with Soulseek interface
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import asyncio
|
|
import uuid
|
|
import time
|
|
import shutil
|
|
import subprocess
|
|
from typing import List, Optional, Dict, Any, Tuple
|
|
from pathlib import Path
|
|
from datetime import datetime, timezone
|
|
from urllib.parse import urljoin
|
|
|
|
try:
|
|
import tidalapi
|
|
except ImportError:
|
|
tidalapi = None
|
|
|
|
import requests as http_requests
|
|
|
|
from utils.logging_config import get_logger
|
|
from config.settings import config_manager
|
|
|
|
# Import Soulseek data structures for drop-in replacement compatibility
|
|
from core.download_plugins.types import TrackResult, AlbumResult, DownloadStatus
|
|
|
|
logger = get_logger("tidal_download_client")
|
|
|
|
|
|
# Quality tiers definitions (used for display in search results)
|
|
QUALITY_MAP = {
|
|
'low': {
|
|
'label': 'AAC 96kbps',
|
|
'extension': 'm4a',
|
|
'bitrate': 96,
|
|
'codec': 'aac',
|
|
},
|
|
'high': {
|
|
'label': 'AAC 320kbps',
|
|
'extension': 'm4a',
|
|
'bitrate': 320,
|
|
'codec': 'aac',
|
|
},
|
|
'lossless': {
|
|
'label': 'FLAC 16-bit/44.1kHz',
|
|
'extension': 'flac',
|
|
'bitrate': 1411,
|
|
'codec': 'flac',
|
|
},
|
|
'hires': {
|
|
'label': 'FLAC 24-bit/96kHz',
|
|
'extension': 'flac',
|
|
'bitrate': 9216,
|
|
'codec': 'flac',
|
|
},
|
|
}
|
|
|
|
# HLS-specific format mapping for v2 trackManifests endpoint
|
|
HLS_QUALITY_MAP = {
|
|
'hires': {
|
|
'formats': ['FLAC_HIRES'],
|
|
'manifest_type': 'HLS',
|
|
'extension': 'flac',
|
|
},
|
|
'lossless': {
|
|
'formats': ['FLAC'],
|
|
'manifest_type': 'HLS',
|
|
'extension': 'flac',
|
|
},
|
|
'high': {
|
|
'formats': ['AACLC'],
|
|
'manifest_type': 'HLS',
|
|
'extension': 'm4a',
|
|
},
|
|
'low': {
|
|
'formats': ['HEAACV1'],
|
|
'manifest_type': 'HLS',
|
|
'extension': 'm4a',
|
|
},
|
|
}
|
|
|
|
HLS_MAP_TAG_RE = re.compile(r'#EXT-X-MAP:.*URI="([^"]+)"')
|
|
|
|
|
|
from core.download_plugins.base import DownloadSourcePlugin
|
|
|
|
|
|
def _looks_like_json_decode_error(exc: Exception) -> bool:
|
|
name = exc.__class__.__name__.lower()
|
|
message = str(exc).lower()
|
|
return (
|
|
"jsondecodeerror" in name
|
|
or "expecting value" in message
|
|
or "could not decode json" in message
|
|
)
|
|
|
|
|
|
class TidalDownloadClient(DownloadSourcePlugin):
|
|
"""
|
|
Tidal download client using tidalapi.
|
|
Provides search, matching, and download capabilities as a drop-in alternative to YouTube/Soulseek.
|
|
"""
|
|
|
|
def __init__(self, download_path: str = None):
|
|
if tidalapi is None:
|
|
logger.warning("tidalapi not installed — Tidal downloads unavailable")
|
|
|
|
if download_path is None:
|
|
download_path = config_manager.get('soulseek.download_path', './downloads')
|
|
|
|
self.download_path = Path(download_path)
|
|
self.download_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
logger.info(f"Tidal download client using download path: {self.download_path}")
|
|
|
|
self.shutdown_check = None
|
|
|
|
self.session: Optional['tidalapi.Session'] = None
|
|
self._init_session()
|
|
|
|
self._device_auth_future = None
|
|
self._device_auth_link = None
|
|
|
|
# Engine reference is populated by set_engine() at registration
|
|
# time. Until then dispatch returns None — orchestrator wires
|
|
# this immediately so the only None case is tests that bypass
|
|
# the orchestrator.
|
|
self._engine = None
|
|
|
|
def set_engine(self, engine):
|
|
"""Engine callback — gives the client access to the central
|
|
thread worker + state store. Engine calls this during
|
|
``register_plugin`` if the plugin defines it."""
|
|
self._engine = engine
|
|
|
|
def set_shutdown_check(self, check_callable):
|
|
self.shutdown_check = check_callable
|
|
|
|
def _init_session(self):
|
|
if tidalapi is None:
|
|
return
|
|
|
|
self.session = tidalapi.Session()
|
|
|
|
saved = config_manager.get('tidal_download.session', {})
|
|
token_type = saved.get('token_type', '')
|
|
access_token = saved.get('access_token', '')
|
|
refresh_token = saved.get('refresh_token', '')
|
|
expiry_time = saved.get('expiry_time', 0)
|
|
|
|
if token_type and access_token:
|
|
try:
|
|
expiry_dt = datetime.fromtimestamp(expiry_time, tz=timezone.utc) if expiry_time else None
|
|
|
|
restored = self.session.load_oauth_session(
|
|
token_type=token_type,
|
|
access_token=access_token,
|
|
refresh_token=refresh_token,
|
|
expiry_time=expiry_dt,
|
|
)
|
|
if restored and self.session.check_login():
|
|
logger.info("Restored Tidal download session from saved tokens")
|
|
self._save_session()
|
|
return
|
|
else:
|
|
logger.warning("Saved Tidal session tokens are invalid/expired")
|
|
except Exception as e:
|
|
logger.warning(f"Could not restore Tidal session: {e}")
|
|
|
|
def _save_session(self):
|
|
if not self.session:
|
|
return
|
|
config_manager.set('tidal_download.session', {
|
|
'token_type': self.session.token_type or '',
|
|
'access_token': self.session.access_token or '',
|
|
'refresh_token': self.session.refresh_token or '',
|
|
'expiry_time': self.session.expiry_time.timestamp() if self.session.expiry_time else 0,
|
|
})
|
|
|
|
def is_authenticated(self) -> bool:
|
|
if not self.session:
|
|
return False
|
|
try:
|
|
return self.session.check_login()
|
|
except Exception:
|
|
return False
|
|
|
|
def start_device_auth(self) -> Optional[Dict[str, str]]:
|
|
if tidalapi is None:
|
|
return None
|
|
|
|
try:
|
|
if not self.session:
|
|
self.session = tidalapi.Session()
|
|
|
|
login, future = self.session.login_oauth()
|
|
self._device_auth_future = future
|
|
raw_uri = login.verification_uri_complete or f"link.tidal.com/{login.user_code}"
|
|
if not raw_uri.startswith(('http://', 'https://')):
|
|
raw_uri = f"https://{raw_uri}"
|
|
self._device_auth_link = {
|
|
'verification_uri': raw_uri,
|
|
'user_code': login.user_code,
|
|
}
|
|
logger.info(f"Tidal device auth started — code: {login.user_code}")
|
|
return self._device_auth_link
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to start Tidal device auth: {e}")
|
|
return None
|
|
|
|
def check_device_auth(self) -> Dict[str, Any]:
|
|
if not self._device_auth_future:
|
|
return {'status': 'error', 'message': 'No auth in progress'}
|
|
|
|
try:
|
|
if self._device_auth_future.running():
|
|
return {
|
|
'status': 'pending',
|
|
'verification_uri': self._device_auth_link.get('verification_uri', ''),
|
|
'user_code': self._device_auth_link.get('user_code', ''),
|
|
}
|
|
|
|
result = self._device_auth_future.result(timeout=0)
|
|
if self.session and self.session.check_login():
|
|
self._save_session()
|
|
logger.info("Tidal device auth completed successfully")
|
|
return {'status': 'completed', 'message': 'Authenticated successfully'}
|
|
else:
|
|
return {'status': 'error', 'message': 'Auth completed but session invalid'}
|
|
|
|
except Exception as e:
|
|
if _looks_like_json_decode_error(e):
|
|
logger.error(
|
|
"Tidal device auth check received a non-JSON response from Tidal's token endpoint: %s",
|
|
e,
|
|
)
|
|
return {
|
|
'status': 'error',
|
|
'message': (
|
|
"Tidal returned an invalid auth response while SoulSync was finishing login. "
|
|
"Try again after disabling VPN/proxy/network filtering, then restart SoulSync if it repeats."
|
|
),
|
|
}
|
|
logger.error(f"Tidal device auth check error: {e}")
|
|
return {'status': 'error', 'message': str(e)}
|
|
|
|
def is_available(self) -> bool:
|
|
return tidalapi is not None and self.is_authenticated()
|
|
|
|
def is_configured(self) -> bool:
|
|
return self.is_available()
|
|
|
|
async def check_connection(self) -> bool:
|
|
try:
|
|
loop = asyncio.get_event_loop()
|
|
return await loop.run_in_executor(None, self.is_available)
|
|
except Exception as e:
|
|
logger.error(f"Tidal connection check failed: {e}")
|
|
return False
|
|
|
|
_QUALIFIER_KEYWORDS = frozenset({
|
|
'remix', 'mix', 'edit', 'version', 'dub', 'rmx', 'vip', 'cut',
|
|
'rework', 'bootleg', 'flip',
|
|
'live', 'concert', 'unplugged', 'acoustic', 'session',
|
|
'instrumental', 'karaoke', 'demo', 'bonus',
|
|
'extended', 'radio',
|
|
})
|
|
|
|
@classmethod
|
|
def _extract_qualifiers(cls, query: str) -> List[str]:
|
|
if not query:
|
|
return []
|
|
found = []
|
|
q_lower = query.lower()
|
|
for kw in cls._QUALIFIER_KEYWORDS:
|
|
if re.search(r'\b' + re.escape(kw) + r'\b', q_lower):
|
|
found.append(kw)
|
|
return found
|
|
|
|
@staticmethod
|
|
def _track_name_contains_qualifiers(track_name: str, qualifiers: List[str]) -> bool:
|
|
if not qualifiers:
|
|
return True
|
|
if not track_name:
|
|
return False
|
|
name_lower = track_name.lower()
|
|
for kw in qualifiers:
|
|
if not re.search(r'\b' + re.escape(kw) + r'\b', name_lower):
|
|
return False
|
|
return True
|
|
|
|
@classmethod
|
|
def _track_matches_qualifiers(cls, track, qualifiers: List[str]) -> bool:
|
|
"""Issue #589 — qualifier check must inspect both track.name AND
|
|
track.album.name. For MTV Unplugged-style releases the live /
|
|
unplugged signal lives in the album title, not the track title.
|
|
A track passes if every required qualifier appears as a whole
|
|
word in either the track name OR its album name.
|
|
"""
|
|
if not qualifiers:
|
|
return True
|
|
track_name = (getattr(track, 'name', '') or '').lower()
|
|
album = getattr(track, 'album', None)
|
|
album_name = (getattr(album, 'name', '') or '').lower() if album else ''
|
|
haystack = f"{track_name} {album_name}".strip()
|
|
if not haystack:
|
|
return False
|
|
for kw in qualifiers:
|
|
if not re.search(r'\b' + re.escape(kw) + r'\b', haystack):
|
|
return False
|
|
return True
|
|
|
|
@staticmethod
|
|
def _generate_shortened_queries(original: str) -> List[str]:
|
|
variants: List[str] = []
|
|
seen = {original.strip().lower()}
|
|
|
|
def _add(candidate: str) -> None:
|
|
candidate = candidate.strip()
|
|
if candidate and candidate.lower() not in seen:
|
|
variants.append(candidate)
|
|
seen.add(candidate.lower())
|
|
|
|
_add(re.sub(r'\s*[\(\[][^\)\]]*[\)\]]\s*$', '', original))
|
|
_add(re.sub(r'\s*[\(\[][^\)\]]*[\)\]]', ' ', original))
|
|
|
|
tokens = original.split()
|
|
|
|
if len(tokens) >= 3:
|
|
_add(' '.join(tokens[:-1]))
|
|
|
|
if len(tokens) >= 4:
|
|
_add(' '.join(tokens[:-2]))
|
|
|
|
if len(tokens) >= 5:
|
|
_add(' '.join(tokens[:-3]))
|
|
|
|
if len(tokens) >= 7:
|
|
_add(' '.join(tokens[:len(tokens) // 2 + 1]))
|
|
|
|
return variants
|
|
|
|
async def search(self, query: str, timeout: int = None, progress_callback=None) -> Tuple[List[TrackResult], List[AlbumResult]]:
|
|
if not self.is_available():
|
|
logger.warning("Tidal not available for search (not authenticated)")
|
|
return ([], [])
|
|
|
|
if not query or not isinstance(query, str):
|
|
logger.warning(f"Invalid Tidal search query: {query!r}")
|
|
return ([], [])
|
|
|
|
logger.info(f"Searching Tidal for: {query}")
|
|
|
|
try:
|
|
queries_to_try = [query] + self._generate_shortened_queries(query)
|
|
queries_to_try = queries_to_try[:5]
|
|
|
|
required_qualifiers = self._extract_qualifiers(query)
|
|
|
|
tidal_tracks: list = []
|
|
successful_query: Optional[str] = None
|
|
last_error: Optional[Exception] = None
|
|
any_fallback_filtered_out = False
|
|
|
|
loop = asyncio.get_event_loop()
|
|
for attempt_idx, attempt_query in enumerate(queries_to_try):
|
|
try:
|
|
q_copy = attempt_query
|
|
|
|
def _search(q=q_copy):
|
|
results = self.session.search(q, models=[tidalapi.media.Track], limit=50)
|
|
return results.get('tracks', []) if isinstance(results, dict) else []
|
|
|
|
found = await loop.run_in_executor(None, _search)
|
|
|
|
if found:
|
|
# Issue #589 — qualifier filter applies to ALL
|
|
# search attempts, not just fallbacks. If the
|
|
# primary query carries "live" / "unplugged" /
|
|
# etc, the studio cut should never be accepted
|
|
# just because Tidal returned it first. The
|
|
# filter inspects both track.name AND
|
|
# track.album.name (the live signal often lives
|
|
# in the album title for concert releases).
|
|
is_fallback = attempt_idx > 0
|
|
if required_qualifiers:
|
|
filtered = [
|
|
t for t in found
|
|
if self._track_matches_qualifiers(t, required_qualifiers)
|
|
]
|
|
if filtered:
|
|
tidal_tracks = filtered
|
|
successful_query = attempt_query
|
|
logger.info(
|
|
f"Tidal {'fallback' if is_fallback else 'primary'} kept "
|
|
f"{len(filtered)}/{len(found)} tracks after qualifier filter "
|
|
f"{required_qualifiers} for '{attempt_query}'"
|
|
)
|
|
break
|
|
else:
|
|
any_fallback_filtered_out = True
|
|
logger.debug(
|
|
f"Tidal {'fallback' if is_fallback else 'primary'} "
|
|
f"'{attempt_query}' returned {len(found)} tracks but none "
|
|
f"matched required qualifiers {required_qualifiers} — "
|
|
f"trying next variant"
|
|
)
|
|
if attempt_idx < len(queries_to_try) - 1:
|
|
await asyncio.sleep(0.1)
|
|
continue
|
|
else:
|
|
tidal_tracks = found
|
|
successful_query = attempt_query
|
|
break
|
|
|
|
if attempt_idx < len(queries_to_try) - 1:
|
|
logger.debug(f"Tidal returned 0 results for '{attempt_query}' — trying shortened variant")
|
|
await asyncio.sleep(0.1)
|
|
except Exception as e:
|
|
last_error = e
|
|
logger.debug(f"Tidal search attempt {attempt_idx + 1} failed: {e}")
|
|
|
|
if not tidal_tracks:
|
|
if last_error is not None:
|
|
import traceback
|
|
tb_str = ''.join(traceback.format_exception(
|
|
type(last_error), last_error, last_error.__traceback__
|
|
))
|
|
logger.error(
|
|
f"Tidal search failed after {len(queries_to_try)} attempts: {last_error}\n{tb_str}"
|
|
)
|
|
elif any_fallback_filtered_out:
|
|
logger.warning(
|
|
f"No Tidal results for '{query}' — fallbacks found broader matches but "
|
|
f"none preserved required qualifiers {required_qualifiers}"
|
|
)
|
|
else:
|
|
logger.warning(f"No Tidal results for: {query}")
|
|
return ([], [])
|
|
|
|
if successful_query and successful_query != query:
|
|
logger.info(f"Tidal fallback query succeeded: '{successful_query}' (original: '{query}')")
|
|
|
|
quality_key = config_manager.get('tidal_download.quality', 'lossless')
|
|
quality_info = QUALITY_MAP.get(quality_key, QUALITY_MAP['lossless'])
|
|
|
|
track_results = []
|
|
for track in tidal_tracks:
|
|
try:
|
|
track_result = self._tidal_to_track_result(track, quality_info)
|
|
track_results.append(track_result)
|
|
except Exception as e:
|
|
logger.debug(f"Skipping track conversion error: {e}")
|
|
|
|
logger.info(f"Found {len(track_results)} Tidal tracks")
|
|
return (track_results, [])
|
|
|
|
except Exception as e:
|
|
logger.error(f"Tidal search orchestration failed: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return ([], [])
|
|
|
|
def _tidal_to_track_result(self, track, quality_info: dict) -> TrackResult:
|
|
artist_name = track.artist.name if track.artist else 'Unknown Artist'
|
|
title = track.name or 'Unknown Title'
|
|
album_name = track.album.name if track.album else None
|
|
|
|
duration_ms = int(track.duration * 1000) if track.duration else None
|
|
|
|
display_name = f"{artist_name} - {title}"
|
|
filename = f"{track.id}||{display_name}"
|
|
|
|
track_result = TrackResult(
|
|
username='tidal',
|
|
filename=filename,
|
|
size=0,
|
|
bitrate=quality_info.get('bitrate'),
|
|
duration=duration_ms,
|
|
quality=quality_info.get('codec', 'flac'),
|
|
free_upload_slots=999,
|
|
upload_speed=999999,
|
|
queue_length=0,
|
|
artist=artist_name,
|
|
title=title,
|
|
album=album_name,
|
|
track_number=track.track_num,
|
|
_source_metadata={
|
|
'source': 'tidal',
|
|
'track_id': track.id,
|
|
'artist_id': track.artist.id if track.artist else None,
|
|
'isrc': track.isrc or None,
|
|
'bpm': track.bpm if track.bpm and track.bpm > 0 else None,
|
|
'copyright': track.copyright or None,
|
|
},
|
|
)
|
|
|
|
return track_result
|
|
|
|
def _parse_hls_playlist(self, text: str, playlist_url: str):
|
|
init_uri = None
|
|
segment_uris = []
|
|
variant_uri = None
|
|
|
|
lines = [line.strip() for line in text.splitlines() if line.strip()]
|
|
|
|
for index, line in enumerate(lines):
|
|
if line.startswith('#EXTM3U'):
|
|
continue
|
|
|
|
if line.startswith('#EXT-X-STREAM-INF'):
|
|
for next_line in lines[index + 1:]:
|
|
if not next_line.startswith('#'):
|
|
variant_uri = urljoin(playlist_url, next_line)
|
|
break
|
|
break
|
|
|
|
if line.startswith('#EXT-X-MAP'):
|
|
match = HLS_MAP_TAG_RE.search(line)
|
|
if match:
|
|
init_uri = match.group(1)
|
|
continue
|
|
|
|
if line.startswith('#'):
|
|
continue
|
|
|
|
segment_uris.append(urljoin(playlist_url, line))
|
|
|
|
if variant_uri:
|
|
return None, [variant_uri]
|
|
|
|
if not segment_uris:
|
|
raise ValueError('No segment URIs found in the HLS playlist')
|
|
|
|
if init_uri:
|
|
init_uri = urljoin(playlist_url, init_uri)
|
|
|
|
return init_uri, segment_uris
|
|
|
|
def _get_hls_manifest(self, track_id: int, quality: str = 'lossless') -> Optional[Dict]:
|
|
q_info = HLS_QUALITY_MAP.get(quality, HLS_QUALITY_MAP['lossless'])
|
|
formats = q_info['formats']
|
|
|
|
access_token = self.session.access_token
|
|
if not access_token:
|
|
logger.error("No Tidal access token available")
|
|
return None
|
|
|
|
url = f"https://openapi.tidal.com/v2/trackManifests/{track_id}"
|
|
params = [
|
|
('adaptive', 'true'),
|
|
('manifestType', 'HLS'),
|
|
('uriScheme', 'HTTPS'),
|
|
('usage', 'DOWNLOAD'),
|
|
]
|
|
for fmt in formats:
|
|
params.append(('formats', fmt))
|
|
|
|
headers = {
|
|
'Authorization': f'Bearer {access_token}',
|
|
'Accept': 'application/vnd.api+json',
|
|
}
|
|
|
|
try:
|
|
response = http_requests.get(url, params=params, headers=headers, timeout=20)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
except http_requests.HTTPError as e:
|
|
logger.warning(f"Failed to fetch HLS manifest for track {track_id}: {e}")
|
|
return None
|
|
except Exception as e:
|
|
logger.warning(f"Failed to fetch HLS manifest for track {track_id}: {e}")
|
|
return None
|
|
|
|
try:
|
|
attrs = data.get('data', {}).get('attributes', {})
|
|
uri = attrs.get('uri')
|
|
except (AttributeError, KeyError) as e:
|
|
logger.warning(f"Failed to extract playlist URI from manifest response for track {track_id}: {e}")
|
|
return None
|
|
|
|
if not uri:
|
|
logger.warning(f"No playlist URI in manifest for track {track_id}")
|
|
return None
|
|
|
|
try:
|
|
playlist_resp = http_requests.get(uri, allow_redirects=True, timeout=30)
|
|
playlist_resp.raise_for_status()
|
|
playlist_text = playlist_resp.text
|
|
except Exception as e:
|
|
logger.warning(f"Failed to fetch HLS playlist for track {track_id}: {e}")
|
|
return None
|
|
|
|
try:
|
|
init_uri, segment_uris = self._parse_hls_playlist(playlist_text, uri)
|
|
except ValueError as e:
|
|
logger.warning(f"Failed to parse HLS playlist for track {track_id}: {e}")
|
|
return None
|
|
|
|
if '#EXT-X-STREAM-INF' in playlist_text and segment_uris:
|
|
playlist_uri = segment_uris[0]
|
|
try:
|
|
logger.debug(f"Detected master HLS playlist, following variant: {playlist_uri}")
|
|
variant_resp = http_requests.get(playlist_uri, allow_redirects=True, timeout=30)
|
|
variant_resp.raise_for_status()
|
|
variant_text = variant_resp.text
|
|
init_uri, segment_uris = self._parse_hls_playlist(variant_text, playlist_uri)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to fetch variant playlist for track {track_id}: {e}")
|
|
return None
|
|
|
|
if init_uri:
|
|
logger.info(f"Tidal HLS manifest for track {track_id}: "
|
|
f"init segment + {len(segment_uris)} segments ({quality})")
|
|
else:
|
|
logger.info(f"Tidal HLS manifest for track {track_id}: "
|
|
f"{len(segment_uris)} segments ({quality})")
|
|
|
|
return {
|
|
'init_uri': init_uri,
|
|
'segment_uris': segment_uris,
|
|
'extension': QUALITY_MAP.get(quality, {}).get('extension', 'flac'),
|
|
'codec': QUALITY_MAP.get(quality, {}).get('codec', 'flac'),
|
|
'quality': quality,
|
|
}
|
|
|
|
def _demux_flac(self, input_path: Path, output_path: Path) -> None:
|
|
ffmpeg = shutil.which('ffmpeg')
|
|
if not ffmpeg:
|
|
tools_dir = Path(__file__).parent.parent / 'tools'
|
|
ffmpeg_candidate = tools_dir / ('ffmpeg.exe' if os.name == 'nt' else 'ffmpeg')
|
|
if ffmpeg_candidate.exists():
|
|
ffmpeg = str(ffmpeg_candidate)
|
|
else:
|
|
raise RuntimeError('ffmpeg is required to demux FLAC from MP4. Install ffmpeg and retry.')
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
[
|
|
ffmpeg,
|
|
'-y',
|
|
'-hide_banner',
|
|
'-loglevel', 'error',
|
|
'-i', str(input_path),
|
|
'-map', '0:a:0',
|
|
'-c', 'copy',
|
|
str(output_path),
|
|
],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
except subprocess.CalledProcessError as exc:
|
|
raise RuntimeError(
|
|
f'ffmpeg failed while demuxing {input_path} -> {output_path}: '
|
|
f'{exc.returncode}\n{exc.stderr}'
|
|
) from exc
|
|
|
|
async def download(self, username: str, filename: str, file_size: int = 0) -> Optional[str]:
|
|
if '||' not in filename:
|
|
logger.error(f"Invalid filename format: {filename}")
|
|
return None
|
|
if self._engine is None:
|
|
# Raise rather than return None so the orchestrator's
|
|
# download_with_fallback surfaces a real warning + tries
|
|
# the next source. Returning None silently dropped the
|
|
# download with no user feedback (per JohnBaumb).
|
|
raise RuntimeError("Tidal client has no engine reference — cannot dispatch download")
|
|
|
|
track_id_str, display_name = filename.split('||', 1)
|
|
try:
|
|
track_id = int(track_id_str)
|
|
except ValueError:
|
|
logger.error(f"Invalid Tidal track ID: {track_id_str}")
|
|
return None
|
|
|
|
logger.info(f"Starting Tidal download: {display_name}")
|
|
|
|
return self._engine.worker.dispatch(
|
|
source_name='tidal',
|
|
target_id=track_id,
|
|
display_name=display_name,
|
|
original_filename=filename,
|
|
impl_callable=self._download_sync,
|
|
extra_record_fields={
|
|
'track_id': track_id,
|
|
'display_name': display_name,
|
|
},
|
|
)
|
|
|
|
def _download_sync(self, download_id: str, track_id: int, display_name: str) -> Optional[str]:
|
|
if not self.session or not self.session.check_login():
|
|
logger.error("Tidal session not authenticated")
|
|
return None
|
|
|
|
quality_key = config_manager.get('tidal_download.quality', 'lossless')
|
|
chain = ['hires', 'lossless', 'high', 'low']
|
|
start = chain.index(quality_key) if quality_key in chain else 1
|
|
allow_fallback = config_manager.get('tidal_download.allow_fallback', True)
|
|
chain = chain[start:] if allow_fallback else [quality_key]
|
|
|
|
MIN_AUDIO_SIZE = 100 * 1024
|
|
|
|
for q_key in chain:
|
|
if self.shutdown_check and self.shutdown_check():
|
|
logger.info("Shutdown detected, aborting Tidal download")
|
|
return None
|
|
|
|
manifest_info = self._get_hls_manifest(track_id, quality=q_key)
|
|
if not manifest_info or not manifest_info.get('segment_uris'):
|
|
logger.warning(f"No HLS manifest at quality {q_key}, trying next")
|
|
continue
|
|
|
|
extension = manifest_info['extension']
|
|
safe_name = re.sub(r'[<>:"/\\|?*]', '_', display_name)
|
|
out_filename = f"{safe_name}.{extension}"
|
|
out_path = self.download_path / out_filename
|
|
|
|
is_flac = q_key in ('hires', 'lossless')
|
|
intermediate_path = out_path.with_suffix('.m4a') if is_flac else out_path
|
|
|
|
try:
|
|
init_uri = manifest_info.get('init_uri')
|
|
segment_uris = manifest_info['segment_uris']
|
|
total_segments = len(segment_uris) + (1 if init_uri else 0)
|
|
|
|
logger.info(f"Downloading from Tidal ({q_key}): {out_filename} "
|
|
f"({total_segments} segments)")
|
|
|
|
downloaded = 0
|
|
speed_start = time.time()
|
|
segments_completed = 0
|
|
|
|
if self._engine is not None:
|
|
self._engine.update_record('tidal', download_id, {'size': 0})
|
|
|
|
with intermediate_path.open('wb') as output_file:
|
|
if init_uri:
|
|
if self.shutdown_check and self.shutdown_check():
|
|
logger.info("Shutdown detected, aborting Tidal download")
|
|
intermediate_path.unlink(missing_ok=True)
|
|
return None
|
|
|
|
logger.debug(f"Downloading init segment: {init_uri}")
|
|
init_data = self._download_segment_with_retry(init_uri)
|
|
output_file.write(init_data)
|
|
downloaded += len(init_data)
|
|
segments_completed += 1
|
|
|
|
self._update_download_progress(download_id, downloaded,
|
|
segments_completed, total_segments, speed_start)
|
|
|
|
for segment_url in segment_uris:
|
|
if self.shutdown_check and self.shutdown_check():
|
|
logger.info("Shutdown detected, aborting Tidal download")
|
|
intermediate_path.unlink(missing_ok=True)
|
|
return None
|
|
|
|
segment_data = self._download_segment_with_retry(segment_url)
|
|
output_file.write(segment_data)
|
|
downloaded += len(segment_data)
|
|
segments_completed += 1
|
|
|
|
self._update_download_progress(download_id, downloaded,
|
|
segments_completed, total_segments, speed_start)
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Download failed at quality {q_key}: {e}")
|
|
intermediate_path.unlink(missing_ok=True)
|
|
continue
|
|
|
|
if downloaded < MIN_AUDIO_SIZE:
|
|
logger.warning(f"File too small at {q_key} ({downloaded} bytes), trying next")
|
|
intermediate_path.unlink(missing_ok=True)
|
|
continue
|
|
|
|
try:
|
|
if is_flac:
|
|
logger.info(f"Demuxing FLAC from MP4 container: {intermediate_path} -> {out_path}")
|
|
self._demux_flac(intermediate_path, out_path)
|
|
intermediate_path.unlink(missing_ok=True)
|
|
final_size = out_path.stat().st_size if out_path.exists() else 0
|
|
else:
|
|
final_size = intermediate_path.stat().st_size if intermediate_path.exists() else 0
|
|
|
|
if final_size < MIN_AUDIO_SIZE:
|
|
logger.warning(f"Final file too small after processing at {q_key} "
|
|
f"({final_size} bytes), trying next")
|
|
out_path.unlink(missing_ok=True)
|
|
continue
|
|
|
|
logger.info(f"Tidal download complete ({q_key}): {out_path} "
|
|
f"({final_size / (1024*1024):.1f} MB)")
|
|
return str(out_path)
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Post-processing failed at quality {q_key}: {e}")
|
|
out_path.unlink(missing_ok=True)
|
|
intermediate_path.unlink(missing_ok=True)
|
|
continue
|
|
|
|
logger.error(f"All quality tiers exhausted for '{display_name}'")
|
|
return None
|
|
|
|
def _download_segment_with_retry(self, url: str) -> bytes:
|
|
"""Download a single HLS segment with 3 retries and 2s fixed backoff."""
|
|
last_error = None
|
|
for attempt in range(4):
|
|
try:
|
|
resp = http_requests.get(url, allow_redirects=True, timeout=30)
|
|
resp.raise_for_status()
|
|
return resp.content
|
|
except http_requests.exceptions.HTTPError as e:
|
|
status = e.response.status_code if e.response is not None else 0
|
|
if 400 <= status < 500:
|
|
raise
|
|
last_error = e
|
|
except (http_requests.exceptions.Timeout,
|
|
http_requests.exceptions.ConnectionError) as e:
|
|
last_error = e
|
|
|
|
if attempt < 3:
|
|
if self.shutdown_check and self.shutdown_check():
|
|
raise RuntimeError("Shutdown requested")
|
|
logger.warning(f"Tidal segment download failed (attempt {attempt + 1}/4), "
|
|
f"retrying in 2s: {url}")
|
|
time.sleep(2)
|
|
|
|
raise last_error
|
|
|
|
def _update_download_progress(self, download_id: str, downloaded: int,
|
|
segments_completed: int, total_segments: int,
|
|
speed_start: float):
|
|
if self._engine is None:
|
|
return
|
|
record = self._engine.get_record('tidal', download_id)
|
|
if record is None:
|
|
return
|
|
|
|
now = time.time()
|
|
elapsed_total = now - speed_start
|
|
speed = int(downloaded / elapsed_total) if elapsed_total > 0 else 0
|
|
|
|
progress = record.get('progress', 0.0)
|
|
if total_segments > 0:
|
|
progress = round(min((segments_completed / total_segments) * 100, 99.9), 1)
|
|
|
|
time_remaining = None
|
|
if speed > 0:
|
|
remaining_bytes = downloaded * (total_segments / max(segments_completed, 1)) - downloaded
|
|
if remaining_bytes > 0:
|
|
time_remaining = int(remaining_bytes / speed)
|
|
|
|
self._engine.update_record('tidal', download_id, {
|
|
'transferred': downloaded,
|
|
'speed': speed,
|
|
'progress': progress,
|
|
'time_remaining': time_remaining,
|
|
})
|
|
|
|
def _record_to_status(self, record):
|
|
return DownloadStatus(
|
|
id=record['id'],
|
|
filename=record['filename'],
|
|
username=record['username'],
|
|
state=record['state'],
|
|
progress=record['progress'],
|
|
size=record.get('size', 0),
|
|
transferred=record.get('transferred', 0),
|
|
speed=record.get('speed', 0),
|
|
time_remaining=record.get('time_remaining'),
|
|
file_path=record.get('file_path'),
|
|
)
|
|
|
|
async def get_all_downloads(self) -> List[DownloadStatus]:
|
|
if self._engine is None:
|
|
return []
|
|
return [
|
|
self._record_to_status(record)
|
|
for record in self._engine.iter_records_for_source('tidal')
|
|
]
|
|
|
|
async def get_download_status(self, download_id: str) -> Optional[DownloadStatus]:
|
|
if self._engine is None:
|
|
return None
|
|
record = self._engine.get_record('tidal', download_id)
|
|
return self._record_to_status(record) if record is not None else None
|
|
|
|
async def cancel_download(self, download_id: str, username: str = None, remove: bool = False) -> bool:
|
|
if self._engine is None:
|
|
return False
|
|
if self._engine.get_record('tidal', download_id) is None:
|
|
logger.warning(f"Tidal download {download_id} not found")
|
|
return False
|
|
self._engine.update_record('tidal', download_id, {'state': 'Cancelled'})
|
|
logger.info(f"Marked Tidal download {download_id} as cancelled")
|
|
if remove:
|
|
self._engine.remove_record('tidal', download_id)
|
|
logger.info(f"Removed Tidal download {download_id} from queue")
|
|
return True
|
|
|
|
async def clear_all_completed_downloads(self) -> bool:
|
|
if self._engine is None:
|
|
return True
|
|
try:
|
|
terminal = {'Completed, Succeeded', 'Cancelled', 'Errored', 'Aborted'}
|
|
for record in list(self._engine.iter_records_for_source('tidal')):
|
|
if record.get('state') in terminal:
|
|
self._engine.remove_record('tidal', record['id'])
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error clearing downloads: {e}")
|
|
return False
|