mirror of https://github.com/Nezreka/SoulSync.git
Add optional post-download audio fingerprint verification using AcoustID. Downloads are verified against expected track/artist using fuzzy string matching on AcoustID results. Mismatched files are quarantined and automatically added to the wishlist for retry. - AcoustID verification with title/artist fuzzy matching (not MBID comparison) - Quarantine system with JSON metadata sidecars for failed verifications - fpcalc binary auto-download for Windows, macOS (universal), and Linux - MusicBrainz enrichment worker with live status UI and track badges - Settings page AcoustID section with real-fingerprint connection test - Source reuse for album downloads to keep tracks from same Soulseek user - Enhanced search queries for better track matching - Bug fixes: wishlist tracking, album splitting, regex & handling, log rotationpull/130/head 1.5
parent
2d97d5c7d2
commit
d9efcbdf99
@ -0,0 +1,450 @@
|
||||
"""
|
||||
AcoustID Client for audio fingerprinting and lookup.
|
||||
|
||||
Uses the pyacoustid library which handles:
|
||||
- Fingerprint generation via chromaprint library
|
||||
- AcoustID API lookups
|
||||
- Rate limiting
|
||||
|
||||
The fpcalc binary is auto-downloaded if not found (Windows, macOS, Linux x86_64).
|
||||
"""
|
||||
|
||||
import threading
|
||||
import sys
|
||||
import platform
|
||||
import zipfile
|
||||
import tarfile
|
||||
import tempfile
|
||||
import urllib.request
|
||||
from typing import Dict, List, Optional, Any, Tuple
|
||||
from pathlib import Path
|
||||
import os
|
||||
import shutil
|
||||
import logging.handlers
|
||||
|
||||
from utils.logging_config import get_logger
|
||||
from config.settings import config_manager
|
||||
|
||||
# fpcalc binary location (downloaded automatically if needed)
|
||||
FPCALC_BIN_DIR = Path(__file__).parent.parent / "bin"
|
||||
CHROMAPRINT_VERSION = "1.5.1"
|
||||
|
||||
# Set up dedicated AcoustID logger with its own file
|
||||
logger = get_logger("acoustid_client")
|
||||
|
||||
# Add dedicated file handler for AcoustID logs
|
||||
_acoustid_log_path = Path(__file__).parent.parent / "logs" / "acoustid.log"
|
||||
_acoustid_log_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
_acoustid_file_handler = logging.handlers.RotatingFileHandler(
|
||||
_acoustid_log_path, encoding='utf-8', maxBytes=5*1024*1024, backupCount=2
|
||||
)
|
||||
_acoustid_file_handler.setLevel(logging.DEBUG)
|
||||
_acoustid_file_handler.setFormatter(logging.Formatter(
|
||||
fmt='%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s',
|
||||
datefmt='%Y-%m-%d %H:%M:%S'
|
||||
))
|
||||
logger.addHandler(_acoustid_file_handler)
|
||||
logging.getLogger("newmusic.acoustid_verification").addHandler(_acoustid_file_handler)
|
||||
|
||||
# Check if pyacoustid is available
|
||||
try:
|
||||
import acoustid
|
||||
ACOUSTID_AVAILABLE = True
|
||||
logger.info("pyacoustid library loaded successfully")
|
||||
except ImportError:
|
||||
ACOUSTID_AVAILABLE = False
|
||||
logger.warning("pyacoustid library not installed - run: pip install pyacoustid")
|
||||
|
||||
def _get_fpcalc_download_url() -> Optional[str]:
|
||||
"""Get the download URL for fpcalc based on current platform."""
|
||||
system = platform.system().lower()
|
||||
machine = platform.machine().lower()
|
||||
|
||||
# Map architecture names
|
||||
if machine in ('x86_64', 'amd64'):
|
||||
arch = 'x86_64'
|
||||
elif machine in ('i386', 'i686', 'x86'):
|
||||
arch = 'i686'
|
||||
elif machine in ('arm64', 'aarch64'):
|
||||
arch = 'aarch64'
|
||||
else:
|
||||
logger.warning(f"Unknown architecture: {machine}")
|
||||
return None
|
||||
|
||||
base_url = f"https://github.com/acoustid/chromaprint/releases/download/v{CHROMAPRINT_VERSION}"
|
||||
|
||||
if system == 'windows':
|
||||
if arch == 'x86_64':
|
||||
return f"{base_url}/chromaprint-fpcalc-{CHROMAPRINT_VERSION}-windows-x86_64.zip"
|
||||
elif system == 'darwin':
|
||||
# Universal build supports both Intel and Apple Silicon natively
|
||||
return f"{base_url}/chromaprint-fpcalc-{CHROMAPRINT_VERSION}-macos-universal.tar.gz"
|
||||
elif system == 'linux':
|
||||
if arch == 'x86_64':
|
||||
return f"{base_url}/chromaprint-fpcalc-{CHROMAPRINT_VERSION}-linux-x86_64.tar.gz"
|
||||
|
||||
logger.warning(f"No fpcalc download available for {system}-{arch}")
|
||||
return None
|
||||
|
||||
|
||||
def _download_fpcalc() -> Optional[str]:
|
||||
"""
|
||||
Download and extract fpcalc binary for the current platform.
|
||||
|
||||
Returns:
|
||||
Path to fpcalc binary if successful, None otherwise.
|
||||
"""
|
||||
url = _get_fpcalc_download_url()
|
||||
if not url:
|
||||
return None
|
||||
|
||||
try:
|
||||
logger.info(f"Downloading fpcalc from: {url}")
|
||||
|
||||
# Create bin directory
|
||||
FPCALC_BIN_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Download to temp file
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=Path(url).suffix) as tmp:
|
||||
tmp_path = tmp.name
|
||||
urllib.request.urlretrieve(url, tmp_path)
|
||||
|
||||
# Extract based on file type
|
||||
fpcalc_name = "fpcalc.exe" if platform.system().lower() == 'windows' else "fpcalc"
|
||||
fpcalc_dest = FPCALC_BIN_DIR / fpcalc_name
|
||||
|
||||
if url.endswith('.zip'):
|
||||
with zipfile.ZipFile(tmp_path, 'r') as zf:
|
||||
# Find fpcalc in the archive
|
||||
for name in zf.namelist():
|
||||
if name.endswith(fpcalc_name):
|
||||
# Extract to bin directory
|
||||
with zf.open(name) as src, open(fpcalc_dest, 'wb') as dst:
|
||||
dst.write(src.read())
|
||||
break
|
||||
elif url.endswith('.tar.gz'):
|
||||
with tarfile.open(tmp_path, 'r:gz') as tf:
|
||||
for member in tf.getmembers():
|
||||
if member.name.endswith('fpcalc'):
|
||||
# Extract to bin directory
|
||||
member.name = fpcalc_name
|
||||
tf.extract(member, FPCALC_BIN_DIR)
|
||||
break
|
||||
|
||||
# Clean up temp file
|
||||
os.unlink(tmp_path)
|
||||
|
||||
# Make executable on Unix
|
||||
if platform.system().lower() != 'windows':
|
||||
os.chmod(fpcalc_dest, 0o755)
|
||||
|
||||
if fpcalc_dest.exists():
|
||||
logger.info(f"fpcalc downloaded successfully: {fpcalc_dest}")
|
||||
return str(fpcalc_dest)
|
||||
else:
|
||||
logger.error("fpcalc not found in downloaded archive")
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to download fpcalc: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def _find_fpcalc() -> Optional[str]:
|
||||
"""Find fpcalc binary, downloading if necessary."""
|
||||
# Check PATH first
|
||||
fpcalc = shutil.which("fpcalc") or shutil.which("fpcalc.exe")
|
||||
if fpcalc:
|
||||
return fpcalc
|
||||
|
||||
# Check our bin directory
|
||||
fpcalc_name = "fpcalc.exe" if platform.system().lower() == 'windows' else "fpcalc"
|
||||
local_fpcalc = FPCALC_BIN_DIR / fpcalc_name
|
||||
if local_fpcalc.exists():
|
||||
return str(local_fpcalc)
|
||||
|
||||
# Try to download
|
||||
return _download_fpcalc()
|
||||
|
||||
|
||||
# Check if chromaprint/fpcalc is available for fingerprinting
|
||||
CHROMAPRINT_AVAILABLE = False
|
||||
FPCALC_PATH = None
|
||||
|
||||
if ACOUSTID_AVAILABLE:
|
||||
# Try to find or download fpcalc
|
||||
FPCALC_PATH = _find_fpcalc()
|
||||
if FPCALC_PATH:
|
||||
CHROMAPRINT_AVAILABLE = True
|
||||
logger.info(f"fpcalc binary ready: {FPCALC_PATH}")
|
||||
# Set environment variable so pyacoustid can find it
|
||||
os.environ['FPCALC'] = FPCALC_PATH
|
||||
else:
|
||||
logger.warning("fpcalc not available - fingerprinting will not work")
|
||||
|
||||
|
||||
class AcoustIDClient:
|
||||
"""
|
||||
Client for audio fingerprinting via pyacoustid.
|
||||
|
||||
Usage:
|
||||
client = AcoustIDClient()
|
||||
available, reason = client.is_available()
|
||||
if available:
|
||||
result = client.fingerprint_and_lookup("/path/to/audio.mp3")
|
||||
if result:
|
||||
for mbid in result['recording_mbids']:
|
||||
print(f"Match: {mbid}")
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize AcoustID client with settings from config."""
|
||||
self._api_key = None
|
||||
self._enabled = None
|
||||
|
||||
@property
|
||||
def api_key(self) -> str:
|
||||
"""Get API key from config (cached)."""
|
||||
if self._api_key is None:
|
||||
self._api_key = config_manager.get('acoustid.api_key', '')
|
||||
return self._api_key
|
||||
|
||||
@property
|
||||
def enabled(self) -> bool:
|
||||
"""Check if AcoustID verification is enabled in config."""
|
||||
if self._enabled is None:
|
||||
self._enabled = config_manager.get('acoustid.enabled', False)
|
||||
return self._enabled
|
||||
|
||||
def is_available(self) -> Tuple[bool, str]:
|
||||
"""
|
||||
Check if AcoustID verification is available and ready.
|
||||
|
||||
Returns:
|
||||
Tuple of (is_available, reason_message)
|
||||
"""
|
||||
if not ACOUSTID_AVAILABLE:
|
||||
return False, "pyacoustid library not installed"
|
||||
|
||||
if not self.api_key:
|
||||
return False, "No AcoustID API key configured"
|
||||
|
||||
if not self.enabled:
|
||||
return False, "AcoustID verification is disabled"
|
||||
|
||||
# Check if chromaprint or fpcalc is available
|
||||
if not self._check_fingerprint_available():
|
||||
return False, "Chromaprint library not installed (install libchromaprint1)"
|
||||
|
||||
return True, "AcoustID verification ready"
|
||||
|
||||
def _check_fingerprint_available(self) -> bool:
|
||||
"""Check if we can generate fingerprints (chromaprint lib or fpcalc)."""
|
||||
global CHROMAPRINT_AVAILABLE, FPCALC_PATH
|
||||
|
||||
if CHROMAPRINT_AVAILABLE:
|
||||
return True
|
||||
|
||||
# Try to find/download fpcalc if not already available
|
||||
FPCALC_PATH = _find_fpcalc()
|
||||
if FPCALC_PATH:
|
||||
CHROMAPRINT_AVAILABLE = True
|
||||
os.environ['FPCALC'] = FPCALC_PATH
|
||||
logger.info(f"fpcalc now available: {FPCALC_PATH}")
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def _find_test_audio_file(self) -> Optional[str]:
|
||||
"""Find an audio file to use for testing the AcoustID API key."""
|
||||
audio_extensions = {'.mp3', '.flac', '.ogg', '.m4a', '.wav', '.wma', '.aac'}
|
||||
search_dirs = []
|
||||
|
||||
# Check transfer and download paths from config
|
||||
transfer_path = config_manager.get('soulseek.transfer_path', '')
|
||||
download_path = config_manager.get('soulseek.download_path', '')
|
||||
if transfer_path:
|
||||
search_dirs.append(Path(transfer_path))
|
||||
if download_path:
|
||||
search_dirs.append(Path(download_path))
|
||||
|
||||
for search_dir in search_dirs:
|
||||
if not search_dir.exists():
|
||||
continue
|
||||
# Walk up to 2 levels deep to find an audio file quickly
|
||||
for depth, pattern in enumerate(['*', '*/*']):
|
||||
for f in search_dir.glob(pattern):
|
||||
if f.is_file() and f.suffix.lower() in audio_extensions:
|
||||
return str(f)
|
||||
return None
|
||||
|
||||
def test_api_key(self) -> Tuple[bool, str]:
|
||||
"""
|
||||
Validate the API key by fingerprinting a real audio file and looking it up.
|
||||
Falls back to a direct API call if no audio files are available.
|
||||
|
||||
Returns:
|
||||
Tuple of (success, message)
|
||||
"""
|
||||
if not self.api_key:
|
||||
return False, "No API key configured"
|
||||
|
||||
import requests
|
||||
|
||||
try:
|
||||
# Try to find a real audio file to fingerprint for an end-to-end test
|
||||
test_file = self._find_test_audio_file()
|
||||
|
||||
if test_file and CHROMAPRINT_AVAILABLE:
|
||||
logger.info(f"Testing API key with real audio file: {test_file}")
|
||||
try:
|
||||
result = self.fingerprint_and_lookup(test_file)
|
||||
# If we get here without exception, the API key is valid
|
||||
# (invalid keys raise or return error before results)
|
||||
return True, "AcoustID API key is valid"
|
||||
except Exception as e:
|
||||
error_str = str(e).lower()
|
||||
if 'invalid' in error_str and 'api' in error_str:
|
||||
return False, "Invalid AcoustID API key - get one from https://acoustid.org/new-application"
|
||||
# Fingerprint/lookup failed for non-key reasons, fall through to direct test
|
||||
logger.warning(f"Real file test failed ({e}), trying direct API call")
|
||||
|
||||
# Fallback: direct API call with minimal fingerprint
|
||||
url = 'https://api.acoustid.org/v2/lookup'
|
||||
params = {
|
||||
'client': self.api_key,
|
||||
'duration': 187,
|
||||
'fingerprint': 'AQADtMkWaYkSZRGO',
|
||||
'meta': 'recordings'
|
||||
}
|
||||
|
||||
response = requests.get(url, params=params, timeout=10)
|
||||
data = response.json()
|
||||
|
||||
if data.get('status') == 'error':
|
||||
error = data.get('error', {})
|
||||
error_code = error.get('code', 0)
|
||||
error_msg = error.get('message', 'Unknown error')
|
||||
|
||||
# Error code 4 is specifically "invalid API key"
|
||||
if error_code == 4:
|
||||
return False, "Invalid AcoustID API key - get one from https://acoustid.org/new-application"
|
||||
return False, f"AcoustID API error: {error_msg}"
|
||||
|
||||
# Status is 'ok' - key is valid
|
||||
return True, "AcoustID API key is valid"
|
||||
|
||||
except requests.exceptions.Timeout:
|
||||
return False, "AcoustID API timeout - try again later"
|
||||
except requests.exceptions.RequestException as e:
|
||||
return False, f"Network error: {str(e)}"
|
||||
except Exception as e:
|
||||
logger.error(f"Error testing AcoustID API key: {e}")
|
||||
return False, f"Error: {str(e)}"
|
||||
|
||||
def fingerprint_and_lookup(self, audio_file: str) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Generate fingerprint and look up recording in AcoustID.
|
||||
|
||||
This is the main method - combines fingerprinting and lookup in one call.
|
||||
|
||||
Args:
|
||||
audio_file: Path to the audio file
|
||||
|
||||
Returns:
|
||||
Dict with:
|
||||
'recordings': list of dicts with 'mbid', 'title', 'artist', 'score'
|
||||
'best_score': float (highest score across all results)
|
||||
'recording_mbids': list of unique MBIDs (for backward compat)
|
||||
Or None on error.
|
||||
"""
|
||||
if not ACOUSTID_AVAILABLE:
|
||||
logger.debug("Cannot lookup: pyacoustid not available")
|
||||
return None
|
||||
|
||||
if not self.api_key:
|
||||
logger.debug("Cannot lookup: no API key")
|
||||
return None
|
||||
|
||||
if not os.path.isfile(audio_file):
|
||||
logger.warning(f"Cannot lookup: file not found: {audio_file}")
|
||||
return None
|
||||
|
||||
try:
|
||||
import acoustid
|
||||
|
||||
api_key_preview = f"{self.api_key[:8]}..." if self.api_key and len(self.api_key) > 8 else "NOT SET"
|
||||
logger.info(f"Fingerprinting and looking up: {audio_file} (API key: {api_key_preview})")
|
||||
|
||||
# Use match() which handles fingerprinting + lookup + parsing
|
||||
logger.debug("Running acoustid.match()...")
|
||||
recordings = []
|
||||
seen_mbids = set()
|
||||
best_score = 0.0
|
||||
|
||||
for result in acoustid.match(
|
||||
self.api_key,
|
||||
audio_file,
|
||||
parse=True
|
||||
):
|
||||
# match() with parse=True returns (score, recording_id, title, artist)
|
||||
if not isinstance(result, tuple) or len(result) < 2:
|
||||
logger.warning(f"Unexpected result format: {result}")
|
||||
continue
|
||||
|
||||
score = result[0]
|
||||
recording_id = result[1]
|
||||
title = result[2] if len(result) > 2 else None
|
||||
artist = result[3] if len(result) > 3 else None
|
||||
|
||||
logger.debug(f"Got result: score={score}, id={recording_id}, title={title}, artist={artist}")
|
||||
|
||||
if score > best_score:
|
||||
best_score = score
|
||||
|
||||
if recording_id and recording_id not in seen_mbids:
|
||||
seen_mbids.add(recording_id)
|
||||
recordings.append({
|
||||
'mbid': recording_id,
|
||||
'title': title,
|
||||
'artist': artist,
|
||||
'score': score,
|
||||
})
|
||||
logger.info(f"Found match: {title} by {artist} (MBID: {recording_id}, score: {score})")
|
||||
|
||||
if not recordings:
|
||||
logger.info(f"No AcoustID matches found for: {audio_file}")
|
||||
return None
|
||||
|
||||
logger.info(f"AcoustID found {len(recordings)} recording(s) (best score: {best_score:.2f})")
|
||||
return {
|
||||
'recordings': recordings,
|
||||
'best_score': best_score,
|
||||
'recording_mbids': list(seen_mbids),
|
||||
}
|
||||
|
||||
except acoustid.NoBackendError:
|
||||
logger.error("Chromaprint library not found and fpcalc not available")
|
||||
return None
|
||||
except acoustid.FingerprintGenerationError as e:
|
||||
logger.warning(f"Failed to fingerprint {audio_file}: {e}")
|
||||
return None
|
||||
except acoustid.WebServiceError as e:
|
||||
# Log more details about the API error
|
||||
api_key_preview = f"{self.api_key[:8]}..." if self.api_key and len(self.api_key) > 8 else "???"
|
||||
logger.warning(f"AcoustID API error (key: {api_key_preview}): {e}")
|
||||
# Check for common errors
|
||||
error_str = str(e).lower()
|
||||
if 'invalid' in error_str or 'unknown' in error_str:
|
||||
logger.error("API key appears to be invalid - check your AcoustID settings")
|
||||
elif 'rate' in error_str or 'limit' in error_str:
|
||||
logger.warning("Rate limited by AcoustID - will retry later")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error in AcoustID lookup: {e}", exc_info=True)
|
||||
return None
|
||||
|
||||
def refresh_config(self):
|
||||
"""Refresh cached config values (call after settings change)."""
|
||||
self._api_key = None
|
||||
self._enabled = None
|
||||
@ -0,0 +1,260 @@
|
||||
"""
|
||||
AcoustID Verification Service
|
||||
|
||||
Verifies downloaded audio files match expected track metadata by comparing
|
||||
title/artist from AcoustID fingerprint results against the expected track info.
|
||||
|
||||
If the audio fingerprint confidently identifies a DIFFERENT song than expected,
|
||||
the file is flagged as incorrect.
|
||||
"""
|
||||
|
||||
import re
|
||||
from difflib import SequenceMatcher
|
||||
from typing import Optional, Dict, Any, Tuple, List
|
||||
from enum import Enum
|
||||
from utils.logging_config import get_logger
|
||||
from core.acoustid_client import AcoustIDClient
|
||||
|
||||
logger = get_logger("acoustid_verification")
|
||||
|
||||
# Thresholds
|
||||
MIN_ACOUSTID_SCORE = 0.80 # Minimum AcoustID fingerprint score to trust
|
||||
TITLE_MATCH_THRESHOLD = 0.70 # Title similarity needed to consider a match
|
||||
ARTIST_MATCH_THRESHOLD = 0.60 # Artist similarity needed to consider a match
|
||||
|
||||
|
||||
class VerificationResult(Enum):
|
||||
"""Possible outcomes of audio verification."""
|
||||
PASS = "pass" # Title/artist match - file is correct
|
||||
FAIL = "fail" # Title/artist mismatch - wrong file downloaded
|
||||
SKIP = "skip" # Could not verify (error or unavailable) - continue normally
|
||||
DISABLED = "disabled" # Verification not enabled
|
||||
|
||||
|
||||
def _normalize(text: str) -> str:
|
||||
"""Normalize a string for comparison: lowercase, strip parentheticals, punctuation."""
|
||||
if not text:
|
||||
return ""
|
||||
s = text.lower().strip()
|
||||
# Remove common parenthetical suffixes like (Live), (Remastered), (Radio Edit)
|
||||
s = re.sub(r'\s*\((?:live|remaster(?:ed)?|deluxe|bonus|radio\s*edit|single\s*version|visualize.*?)\)', '', s, flags=re.IGNORECASE)
|
||||
# Remove non-alphanumeric except spaces
|
||||
s = re.sub(r'[^\w\s]', '', s)
|
||||
# Collapse whitespace
|
||||
s = re.sub(r'\s+', ' ', s).strip()
|
||||
return s
|
||||
|
||||
|
||||
def _similarity(a: str, b: str) -> float:
|
||||
"""Calculate similarity between two strings (0.0-1.0) after normalization."""
|
||||
na = _normalize(a)
|
||||
nb = _normalize(b)
|
||||
if not na or not nb:
|
||||
return 0.0
|
||||
if na == nb:
|
||||
return 1.0
|
||||
return SequenceMatcher(None, na, nb).ratio()
|
||||
|
||||
|
||||
def _find_best_title_artist_match(
|
||||
recordings: List[Dict[str, Any]],
|
||||
expected_title: str,
|
||||
expected_artist: str,
|
||||
) -> Tuple[Optional[Dict], float, float]:
|
||||
"""
|
||||
Find the AcoustID recording that best matches expected title/artist.
|
||||
|
||||
Returns:
|
||||
(best_recording, title_similarity, artist_similarity)
|
||||
"""
|
||||
best_rec = None
|
||||
best_title_sim = 0.0
|
||||
best_artist_sim = 0.0
|
||||
best_combined = 0.0
|
||||
|
||||
for rec in recordings:
|
||||
title = rec.get('title') or ''
|
||||
artist = rec.get('artist') or ''
|
||||
|
||||
title_sim = _similarity(expected_title, title)
|
||||
artist_sim = _similarity(expected_artist, artist)
|
||||
# Weight title higher since that's the primary identifier
|
||||
combined = (title_sim * 0.6) + (artist_sim * 0.4)
|
||||
|
||||
if combined > best_combined:
|
||||
best_combined = combined
|
||||
best_rec = rec
|
||||
best_title_sim = title_sim
|
||||
best_artist_sim = artist_sim
|
||||
|
||||
return best_rec, best_title_sim, best_artist_sim
|
||||
|
||||
|
||||
class AcoustIDVerification:
|
||||
"""
|
||||
Verification service that compares audio fingerprint identity
|
||||
against expected track metadata using title/artist matching.
|
||||
|
||||
Design Principle: FAIL OPEN
|
||||
- Only returns FAIL when we are CONFIDENT the file is wrong
|
||||
- Any error or uncertainty results in SKIP (continue normally)
|
||||
- Never blocks downloads due to verification infrastructure issues
|
||||
|
||||
Usage:
|
||||
verifier = AcoustIDVerification()
|
||||
result, message = verifier.verify_audio_file(
|
||||
"/path/to/downloaded.mp3",
|
||||
"Expected Song Title",
|
||||
"Expected Artist"
|
||||
)
|
||||
|
||||
if result == VerificationResult.FAIL:
|
||||
# Move to quarantine
|
||||
else:
|
||||
# Continue with normal processing (PASS, SKIP, or DISABLED)
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize verification service."""
|
||||
self.acoustid_client = AcoustIDClient()
|
||||
|
||||
def verify_audio_file(
|
||||
self,
|
||||
audio_file_path: str,
|
||||
expected_track_name: str,
|
||||
expected_artist_name: str,
|
||||
context: Optional[Dict[str, Any]] = None
|
||||
) -> Tuple[VerificationResult, str]:
|
||||
"""
|
||||
Verify that an audio file matches expected track metadata.
|
||||
|
||||
Compares title/artist from AcoustID fingerprint results against
|
||||
the expected track info. No MusicBrainz lookup needed.
|
||||
|
||||
Args:
|
||||
audio_file_path: Path to the downloaded audio file
|
||||
expected_track_name: Track name we expected to download
|
||||
expected_artist_name: Artist name we expected
|
||||
context: Optional download context for logging/debugging
|
||||
|
||||
Returns:
|
||||
Tuple of (VerificationResult, reason_message)
|
||||
"""
|
||||
try:
|
||||
# Step 1: Check availability
|
||||
available, reason = self.acoustid_client.is_available()
|
||||
if not available:
|
||||
logger.debug(f"AcoustID verification skipped: {reason}")
|
||||
return VerificationResult.SKIP, reason
|
||||
|
||||
# Step 2: Fingerprint and lookup in AcoustID
|
||||
logger.info(f"Fingerprinting and looking up: {audio_file_path}")
|
||||
acoustid_result = self.acoustid_client.fingerprint_and_lookup(audio_file_path)
|
||||
|
||||
if not acoustid_result:
|
||||
return VerificationResult.SKIP, "Track not found in AcoustID database"
|
||||
|
||||
recordings = acoustid_result.get('recordings', [])
|
||||
best_score = acoustid_result.get('best_score', 0)
|
||||
|
||||
if not recordings:
|
||||
return VerificationResult.SKIP, "AcoustID returned no recordings"
|
||||
|
||||
logger.debug(
|
||||
f"AcoustID returned {len(recordings)} recording(s) "
|
||||
f"(best fingerprint score: {best_score:.2f})"
|
||||
)
|
||||
|
||||
# Step 3: Check fingerprint confidence
|
||||
if best_score < MIN_ACOUSTID_SCORE:
|
||||
msg = f"AcoustID fingerprint score too low ({best_score:.2f}) to verify"
|
||||
logger.info(msg)
|
||||
return VerificationResult.SKIP, msg
|
||||
|
||||
# Step 4: Find best title/artist match among AcoustID results
|
||||
best_rec, title_sim, artist_sim = _find_best_title_artist_match(
|
||||
recordings, expected_track_name, expected_artist_name
|
||||
)
|
||||
|
||||
if not best_rec:
|
||||
return VerificationResult.SKIP, "No recordings with title/artist info"
|
||||
|
||||
matched_title = best_rec.get('title', '?')
|
||||
matched_artist = best_rec.get('artist', '?')
|
||||
|
||||
logger.info(
|
||||
f"Best match: '{matched_title}' by '{matched_artist}' "
|
||||
f"(title_sim={title_sim:.2f}, artist_sim={artist_sim:.2f})"
|
||||
)
|
||||
|
||||
# Step 5: Decide pass/fail based on similarity
|
||||
if title_sim >= TITLE_MATCH_THRESHOLD and artist_sim >= ARTIST_MATCH_THRESHOLD:
|
||||
msg = (
|
||||
f"Audio verified: '{matched_title}' by '{matched_artist}' "
|
||||
f"matches expected '{expected_track_name}' by '{expected_artist_name}' "
|
||||
f"(title={title_sim:.0%}, artist={artist_sim:.0%})"
|
||||
)
|
||||
logger.info(f"AcoustID verification PASSED - {msg}")
|
||||
return VerificationResult.PASS, msg
|
||||
|
||||
# Title matches but artist doesn't — could be a cover or collab, skip
|
||||
if title_sim >= TITLE_MATCH_THRESHOLD and artist_sim < ARTIST_MATCH_THRESHOLD:
|
||||
# Check if the expected artist appears anywhere in the AcoustID results
|
||||
for rec in recordings:
|
||||
if _similarity(expected_artist_name, rec.get('artist', '')) >= ARTIST_MATCH_THRESHOLD:
|
||||
msg = (
|
||||
f"Audio verified: found '{expected_track_name}' by '{expected_artist_name}' "
|
||||
f"in AcoustID results"
|
||||
)
|
||||
logger.info(f"AcoustID verification PASSED (secondary match) - {msg}")
|
||||
return VerificationResult.PASS, msg
|
||||
|
||||
msg = (
|
||||
f"Title matches but artist unclear: "
|
||||
f"AcoustID='{matched_title}' by '{matched_artist}', "
|
||||
f"expected '{expected_track_name}' by '{expected_artist_name}'"
|
||||
)
|
||||
logger.info(f"AcoustID verification SKIPPED - {msg}")
|
||||
return VerificationResult.SKIP, msg
|
||||
|
||||
# Title doesn't match — check ALL recordings for any title/artist match
|
||||
# (the best combined match might not be the right one if there are many results)
|
||||
for rec in recordings:
|
||||
t = rec.get('title') or ''
|
||||
a = rec.get('artist') or ''
|
||||
if (_similarity(expected_track_name, t) >= TITLE_MATCH_THRESHOLD and
|
||||
_similarity(expected_artist_name, a) >= ARTIST_MATCH_THRESHOLD):
|
||||
msg = (
|
||||
f"Audio verified: found '{t}' by '{a}' in AcoustID results "
|
||||
f"matching expected '{expected_track_name}' by '{expected_artist_name}'"
|
||||
)
|
||||
logger.info(f"AcoustID verification PASSED (scan match) - {msg}")
|
||||
return VerificationResult.PASS, msg
|
||||
|
||||
# No match found — this file is likely wrong
|
||||
# Report what AcoustID thinks the file actually is (top result by score)
|
||||
top = recordings[0]
|
||||
top_title = top.get('title', '?')
|
||||
top_artist = top.get('artist', '?')
|
||||
|
||||
msg = (
|
||||
f"Audio mismatch: file identified as '{top_title}' by '{top_artist}', "
|
||||
f"expected '{expected_track_name}' by '{expected_artist_name}' "
|
||||
f"(title={title_sim:.0%}, artist={artist_sim:.0%})"
|
||||
)
|
||||
logger.warning(f"AcoustID verification FAILED - {msg}")
|
||||
return VerificationResult.FAIL, msg
|
||||
|
||||
except Exception as e:
|
||||
# Any unexpected error -> SKIP (fail open)
|
||||
logger.error(f"Unexpected error during AcoustID verification: {e}")
|
||||
return VerificationResult.SKIP, f"Verification error: {str(e)}"
|
||||
|
||||
def quick_check_available(self) -> Tuple[bool, str]:
|
||||
"""
|
||||
Quick check if verification is available without doing a full verification.
|
||||
|
||||
Returns:
|
||||
Tuple of (is_available, reason)
|
||||
"""
|
||||
return self.acoustid_client.is_available()
|
||||
@ -0,0 +1,295 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
AcoustID Integration Test Script
|
||||
|
||||
Run this script to test the AcoustID verification system before using it in production.
|
||||
It will check:
|
||||
1. fpcalc binary availability
|
||||
2. API key validation
|
||||
3. Fingerprint generation (if audio file provided)
|
||||
4. Full verification flow (if audio file and expected track info provided)
|
||||
|
||||
Usage:
|
||||
python test_acoustid.py # Basic tests
|
||||
python test_acoustid.py path/to/audio.mp3 # Test with audio file
|
||||
python test_acoustid.py path/to/audio.mp3 "Song Title" "Artist Name" # Full test
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
import io
|
||||
|
||||
# Fix Windows encoding issues
|
||||
if sys.platform == 'win32':
|
||||
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
|
||||
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
|
||||
|
||||
# Add project root to path
|
||||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def print_header(text):
|
||||
print("\n" + "=" * 60)
|
||||
print(f" {text}")
|
||||
print("=" * 60)
|
||||
|
||||
|
||||
def print_result(success, message):
|
||||
icon = "[PASS]" if success else "[FAIL]"
|
||||
print(f" {icon} {message}")
|
||||
|
||||
|
||||
def test_chromaprint():
|
||||
"""Test if chromaprint/fpcalc is available for fingerprinting."""
|
||||
print_header("Testing fingerprint backend availability")
|
||||
|
||||
from core.acoustid_client import CHROMAPRINT_AVAILABLE, ACOUSTID_AVAILABLE, FPCALC_PATH
|
||||
|
||||
if not ACOUSTID_AVAILABLE:
|
||||
print_result(False, "pyacoustid library not installed!")
|
||||
print("\n To install:")
|
||||
print(" pip install pyacoustid")
|
||||
return False
|
||||
|
||||
if CHROMAPRINT_AVAILABLE and FPCALC_PATH:
|
||||
print_result(True, f"fpcalc ready: {FPCALC_PATH}")
|
||||
return True
|
||||
|
||||
if CHROMAPRINT_AVAILABLE:
|
||||
print_result(True, "Fingerprint backend available")
|
||||
return True
|
||||
|
||||
print_result(False, "No fingerprint backend available!")
|
||||
print("\n fpcalc will be auto-downloaded on first use.")
|
||||
print(" Or manually install:")
|
||||
print(" - Windows: Auto-download supported")
|
||||
print(" - macOS: brew install chromaprint")
|
||||
print(" - Linux: apt install libchromaprint-tools")
|
||||
return False
|
||||
|
||||
|
||||
def test_api_key():
|
||||
"""Test if AcoustID API key is configured and valid."""
|
||||
print_header("Testing AcoustID API key")
|
||||
|
||||
from core.acoustid_client import AcoustIDClient
|
||||
from config.settings import config_manager
|
||||
|
||||
api_key = config_manager.get('acoustid.api_key', '')
|
||||
|
||||
if not api_key:
|
||||
print_result(False, "No API key configured in settings")
|
||||
print("\n To configure:")
|
||||
print(" 1. Get a free API key from https://acoustid.org/new-application")
|
||||
print(" 2. Add it in Settings > AcoustID section")
|
||||
return False
|
||||
|
||||
print(f" API key found: {api_key[:8]}...{api_key[-4:]}")
|
||||
|
||||
client = AcoustIDClient()
|
||||
success, message = client.test_api_key()
|
||||
|
||||
print_result(success, message)
|
||||
return success
|
||||
|
||||
|
||||
def test_enabled():
|
||||
"""Test if AcoustID verification is enabled."""
|
||||
print_header("Testing AcoustID enabled status")
|
||||
|
||||
from config.settings import config_manager
|
||||
|
||||
enabled = config_manager.get('acoustid.enabled', False)
|
||||
|
||||
if enabled:
|
||||
print_result(True, "AcoustID verification is ENABLED")
|
||||
else:
|
||||
print_result(False, "AcoustID verification is DISABLED")
|
||||
print("\n To enable:")
|
||||
print(" 1. Go to Settings > AcoustID section")
|
||||
print(" 2. Check 'Enable Download Verification'")
|
||||
|
||||
return enabled
|
||||
|
||||
|
||||
def test_availability():
|
||||
"""Test overall availability."""
|
||||
print_header("Testing overall availability")
|
||||
|
||||
from core.acoustid_client import AcoustIDClient
|
||||
|
||||
client = AcoustIDClient()
|
||||
available, reason = client.is_available()
|
||||
|
||||
print_result(available, reason)
|
||||
return available
|
||||
|
||||
|
||||
def test_fingerprint_and_lookup(audio_file):
|
||||
"""Test fingerprint generation and AcoustID lookup for an audio file."""
|
||||
print_header(f"Testing fingerprint and AcoustID lookup")
|
||||
print(f" File: {audio_file}")
|
||||
|
||||
if not os.path.isfile(audio_file):
|
||||
print_result(False, f"File not found: {audio_file}")
|
||||
return None
|
||||
|
||||
from core.acoustid_client import AcoustIDClient
|
||||
|
||||
client = AcoustIDClient()
|
||||
|
||||
available, reason = client.is_available()
|
||||
if not available:
|
||||
print_result(False, f"AcoustID not available: {reason}")
|
||||
return None
|
||||
|
||||
print(" Fingerprinting and looking up (this may take a moment)...")
|
||||
result = client.fingerprint_and_lookup(audio_file)
|
||||
|
||||
if result:
|
||||
recordings = result.get('recordings', [])
|
||||
score = result.get('best_score', 0)
|
||||
print_result(True, f"Found {len(recordings)} recording(s) (score: {score:.2f})")
|
||||
|
||||
for i, rec in enumerate(recordings[:5]): # Show first 5
|
||||
title = rec.get('title', '?')
|
||||
artist = rec.get('artist', '?')
|
||||
mbid = rec.get('mbid', '?')
|
||||
rec_score = rec.get('score', 0)
|
||||
print(f" {i+1}. \"{title}\" by {artist} (score: {rec_score:.2f})")
|
||||
print(f" https://musicbrainz.org/recording/{mbid}")
|
||||
|
||||
if len(recordings) > 5:
|
||||
print(f" ... and {len(recordings) - 5} more")
|
||||
|
||||
return result
|
||||
else:
|
||||
print_result(False, "Track not found in AcoustID database")
|
||||
print(" This may be a rare/new track not yet fingerprinted.")
|
||||
return None
|
||||
|
||||
|
||||
def test_musicbrainz_lookup(track_name, artist_name):
|
||||
"""Test MusicBrainz lookup for expected track."""
|
||||
print_header("Testing MusicBrainz lookup")
|
||||
print(f" Track: '{track_name}'")
|
||||
print(f" Artist: '{artist_name}'")
|
||||
|
||||
try:
|
||||
from database.music_database import MusicDatabase
|
||||
from core.musicbrainz_service import MusicBrainzService
|
||||
|
||||
db = MusicDatabase()
|
||||
mb_service = MusicBrainzService(db)
|
||||
|
||||
print(" Searching MusicBrainz...")
|
||||
result = mb_service.match_recording(track_name, artist_name)
|
||||
|
||||
if result:
|
||||
mbid = result.get('mbid')
|
||||
confidence = result.get('confidence', 0)
|
||||
cached = result.get('cached', False)
|
||||
|
||||
print_result(True, f"Found match (confidence: {confidence}%)")
|
||||
print(f" MBID: {mbid}")
|
||||
print(f" https://musicbrainz.org/recording/{mbid}")
|
||||
print(f" Cached: {cached}")
|
||||
return result
|
||||
else:
|
||||
print_result(False, "No match found in MusicBrainz")
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
print_result(False, f"Error: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def test_full_verification(audio_file, track_name, artist_name):
|
||||
"""Test the full verification flow."""
|
||||
print_header("Testing full verification flow")
|
||||
print(f" File: {audio_file}")
|
||||
print(f" Expected: '{track_name}' by '{artist_name}'")
|
||||
|
||||
from core.acoustid_verification import AcoustIDVerification, VerificationResult
|
||||
|
||||
verifier = AcoustIDVerification()
|
||||
|
||||
# Check availability first
|
||||
available, reason = verifier.quick_check_available()
|
||||
if not available:
|
||||
print_result(False, f"Verification not available: {reason}")
|
||||
return
|
||||
|
||||
print(" Running verification (this may take a moment)...")
|
||||
result, message = verifier.verify_audio_file(
|
||||
audio_file,
|
||||
track_name,
|
||||
artist_name
|
||||
)
|
||||
|
||||
if result == VerificationResult.PASS:
|
||||
print_result(True, f"VERIFICATION PASSED: {message}")
|
||||
elif result == VerificationResult.FAIL:
|
||||
print_result(False, f"VERIFICATION FAILED: {message}")
|
||||
elif result == VerificationResult.SKIP:
|
||||
print(f" [SKIP] Verification skipped: {message}")
|
||||
else:
|
||||
print(f" [????] Unknown result: {result.value} - {message}")
|
||||
|
||||
|
||||
def main():
|
||||
print("\n" + "=" * 60)
|
||||
print(" ACOUSTID VERIFICATION SYSTEM TEST")
|
||||
print("=" * 60)
|
||||
|
||||
# Parse arguments
|
||||
audio_file = sys.argv[1] if len(sys.argv) > 1 else None
|
||||
track_name = sys.argv[2] if len(sys.argv) > 2 else None
|
||||
artist_name = sys.argv[3] if len(sys.argv) > 3 else None
|
||||
|
||||
# Run basic tests
|
||||
chromaprint_ok = test_chromaprint()
|
||||
api_key_ok = test_api_key()
|
||||
enabled_ok = test_enabled()
|
||||
available_ok = test_availability()
|
||||
|
||||
# Summary of basic tests
|
||||
print_header("Basic Tests Summary")
|
||||
print(f" Chromaprint: {'OK' if chromaprint_ok else 'MISSING'}")
|
||||
print(f" API key: {'OK' if api_key_ok else 'MISSING/INVALID'}")
|
||||
print(f" Enabled: {'YES' if enabled_ok else 'NO'}")
|
||||
print(f" Available: {'YES' if available_ok else 'NO'}")
|
||||
|
||||
if not audio_file:
|
||||
print("\n" + "-" * 60)
|
||||
print(" To test fingerprinting, provide an audio file:")
|
||||
print(" python test_acoustid.py path/to/audio.mp3")
|
||||
print("\n To test full verification flow:")
|
||||
print(" python test_acoustid.py path/to/audio.mp3 \"Song Title\" \"Artist\"")
|
||||
print("-" * 60)
|
||||
return
|
||||
|
||||
# Test with audio file (combined fingerprint + lookup)
|
||||
lookup_result = test_fingerprint_and_lookup(audio_file)
|
||||
|
||||
if track_name and artist_name:
|
||||
# Test MusicBrainz lookup
|
||||
mb_result = test_musicbrainz_lookup(track_name, artist_name)
|
||||
|
||||
# Test full verification
|
||||
if available_ok:
|
||||
test_full_verification(audio_file, track_name, artist_name)
|
||||
else:
|
||||
print("\n Skipping full verification test (not available)")
|
||||
|
||||
# Point to log file
|
||||
print("\n" + "-" * 60)
|
||||
log_path = Path(__file__).parent / "logs" / "acoustid.log"
|
||||
print(f" Detailed logs: {log_path}")
|
||||
print("-" * 60 + "\n")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Reference in new issue