Add single file support to auto-import worker

Loose audio files in the staging root are now picked up alongside album
folders. Singles are identified via embedded tags, filename parsing
(Artist - Title.ext), or AcoustID fingerprinting, then matched against
the configured metadata source. Confidence-gated processing applies
the same way as album folders (90%+ auto, 70-90% review, <70% manual).
pull/315/head
Broque Thomas 4 weeks ago
parent c20529dbb5
commit d66adb3c6e

@ -1,9 +1,12 @@
"""Auto-Import Worker — watches staging folder, identifies music, and processes automatically.
Scans the staging folder for audio files, groups them by folder (album),
identifies them using tags/folder names/AcoustID, matches to metadata source
tracklists, and processes high-confidence matches through the post-processing
pipeline. Lower-confidence matches are queued for user review.
Scans the staging folder for audio files and album folders, identifies them
using tags/filenames/AcoustID, matches to metadata source tracklists, and
processes high-confidence matches through the post-processing pipeline.
Lower-confidence matches are queued for user review.
Supports both album folders (directories containing audio files) and single
loose audio files in the staging root.
"""
import hashlib
@ -32,6 +35,7 @@ class FolderCandidate:
audio_files: List[str] = field(default_factory=list)
disc_structure: Dict[int, List[str]] = field(default_factory=dict) # disc_num -> files
folder_hash: str = ''
is_single: bool = False # True for loose files in staging root
def _compute_folder_hash(audio_files: List[str]) -> str:
@ -312,7 +316,7 @@ class AutoImportWorker:
return None
def _enumerate_folders(self, staging: str) -> List[FolderCandidate]:
"""Find album folder candidates in staging directory."""
"""Find album folder and single file candidates in staging directory."""
candidates = []
try:
entries = sorted(os.listdir(staging))
@ -321,6 +325,16 @@ class AutoImportWorker:
for entry in entries:
full_path = os.path.join(staging, entry)
# Loose audio file in staging root → single track candidate
if os.path.isfile(full_path) and os.path.splitext(entry)[1].lower() in AUDIO_EXTENSIONS:
folder_hash = _compute_folder_hash([full_path])
candidates.append(FolderCandidate(
path=full_path, name=entry, audio_files=[full_path],
folder_hash=folder_hash, is_single=True
))
continue
if not os.path.isdir(full_path):
continue
@ -395,7 +409,10 @@ class AutoImportWorker:
# ── Identification ──
def _identify_folder(self, candidate: FolderCandidate) -> Optional[Dict]:
"""Identify what album a folder contains. Returns identification dict or None."""
"""Identify what album/track a folder or single file contains."""
if candidate.is_single:
return self._identify_single(candidate)
# Strategy 1: Read tags
tag_result = self._identify_from_tags(candidate)
@ -414,6 +431,141 @@ class AutoImportWorker:
return None
def _identify_single(self, candidate: FolderCandidate) -> Optional[Dict]:
"""Identify a single audio file from tags, filename, or AcoustID."""
file_path = candidate.audio_files[0]
tags = _read_file_tags(file_path)
artist = tags.get('artist', '')
title = tags.get('title', '')
album = tags.get('album', '')
# Fallback: parse filename (Artist - Title.ext)
if not artist or not title:
basename = os.path.splitext(os.path.basename(file_path))[0]
parts = re.split(r'\s*[-–—]\s*', basename, maxsplit=1)
if len(parts) == 2:
artist = artist or parts[0].strip()
title = title or parts[1].strip()
elif not title:
title = basename.strip()
if not title:
return None
# Search metadata source for track
result = self._search_single_track(artist, title, album)
if result:
return result
# Fallback: AcoustID fingerprint
try:
from core.acoustid_client import AcoustIDClient
client = AcoustIDClient()
fp_result = client.fingerprint_and_lookup(file_path)
if fp_result and fp_result.get('recordings'):
best = fp_result['recordings'][0]
fp_artist = best.get('artist', artist)
fp_title = best.get('title', title)
if fp_artist and fp_title:
result = self._search_single_track(fp_artist, fp_title, '')
if result:
result['method'] = 'acoustid'
return result
except Exception:
pass
# If we have artist and title but no metadata match, still return basic identification
if artist and title:
return {
'album_id': None,
'album_name': album or title, # Use title as album name for singles
'artist_name': artist,
'track_name': title,
'image_url': '',
'total_tracks': 1,
'source': 'tags',
'method': 'tags' if tags.get('artist') else 'filename',
'identification_confidence': 0.7 if tags.get('artist') else 0.5,
'is_single': True,
}
return None
def _search_single_track(self, artist: str, title: str, album: str) -> Optional[Dict]:
"""Search metadata source for a single track match."""
try:
from core.metadata_service import get_primary_source, get_client_for_source
source = get_primary_source()
client = get_client_for_source(source)
if not client or not hasattr(client, 'search_tracks'):
return None
query = f"{artist} {title}" if artist else title
results = client.search_tracks(query, limit=5)
if not results:
return None
# Score results
best_result = None
best_score = 0
for r in results:
r_title = getattr(r, 'name', '') or getattr(r, 'title', '') or ''
r_artists = getattr(r, 'artists', [])
r_artist = ''
if r_artists:
a = r_artists[0]
r_artist = a.get('name', str(a)) if isinstance(a, dict) else str(a)
score = _similarity(title, r_title) * 0.6
if artist:
score += _similarity(artist, r_artist) * 0.4
if score > best_score:
best_score = score
best_result = r
if not best_result or best_score < 0.5:
return None
r_artist = ''
r_album = ''
r_album_id = ''
r_image = ''
if hasattr(best_result, 'artists') and best_result.artists:
a = best_result.artists[0]
r_artist = a.get('name', str(a)) if isinstance(a, dict) else str(a)
if hasattr(best_result, 'album'):
alb = best_result.album
if isinstance(alb, dict):
r_album = alb.get('name', '')
r_album_id = alb.get('id', '')
images = alb.get('images', [])
if images:
r_image = images[0].get('url', '') if isinstance(images[0], dict) else str(images[0])
elif isinstance(alb, str):
r_album = alb
return {
'album_id': r_album_id or None,
'album_name': r_album or title,
'artist_name': r_artist or artist or '',
'track_name': getattr(best_result, 'name', '') or title,
'track_id': getattr(best_result, 'id', ''),
'image_url': r_image,
'total_tracks': 1,
'source': source,
'method': 'tags',
'identification_confidence': best_score,
'is_single': True,
}
except Exception as e:
logger.debug(f"Single track search failed for '{artist} - {title}': {e}")
return None
def _identify_from_tags(self, candidate: FolderCandidate) -> Optional[Dict]:
"""Try to identify album from embedded file tags."""
tags_list = []
@ -556,6 +708,27 @@ class AutoImportWorker:
def _match_tracks(self, candidate: FolderCandidate, identification: Dict) -> Optional[Dict]:
"""Match staging files to the identified album's tracklist."""
# Singles: no album tracklist to match against — the file IS the match
if candidate.is_single or identification.get('is_single'):
conf = identification.get('identification_confidence', 0.7)
track_data = {
'name': identification.get('track_name', identification.get('album_name', '')),
'artists': [{'name': identification.get('artist_name', '')}],
'id': identification.get('track_id', ''),
'track_number': 1,
'disc_number': 1,
}
return {
'matches': [{'track': track_data, 'file': candidate.audio_files[0], 'confidence': conf}],
'unmatched_files': [],
'total_tracks': 1,
'matched_count': 1,
'coverage': 1.0,
'confidence': conf,
'album_data': {'id': identification.get('album_id') or '', 'name': identification.get('album_name', ''),
'tracks': {'items': [track_data]}},
}
try:
from core.metadata_service import get_client_for_source, get_album_tracks_for_source
@ -727,7 +900,7 @@ class AutoImportWorker:
context_key = f"auto_import_{candidate.folder_hash}_{track_number}"
context = {
'spotify_artist': {
'id': identification.get('album_id', 'auto_import'),
'id': identification.get('album_id') or 'auto_import',
'name': artist_name,
'genres': [],
},

Loading…
Cancel
Save