diff --git a/core/auto_import_worker.py b/core/auto_import_worker.py index 0cb423f2..b870e2c3 100644 --- a/core/auto_import_worker.py +++ b/core/auto_import_worker.py @@ -1,9 +1,12 @@ """Auto-Import Worker — watches staging folder, identifies music, and processes automatically. -Scans the staging folder for audio files, groups them by folder (album), -identifies them using tags/folder names/AcoustID, matches to metadata source -tracklists, and processes high-confidence matches through the post-processing -pipeline. Lower-confidence matches are queued for user review. +Scans the staging folder for audio files and album folders, identifies them +using tags/filenames/AcoustID, matches to metadata source tracklists, and +processes high-confidence matches through the post-processing pipeline. +Lower-confidence matches are queued for user review. + +Supports both album folders (directories containing audio files) and single +loose audio files in the staging root. """ import hashlib @@ -32,6 +35,7 @@ class FolderCandidate: audio_files: List[str] = field(default_factory=list) disc_structure: Dict[int, List[str]] = field(default_factory=dict) # disc_num -> files folder_hash: str = '' + is_single: bool = False # True for loose files in staging root def _compute_folder_hash(audio_files: List[str]) -> str: @@ -312,7 +316,7 @@ class AutoImportWorker: return None def _enumerate_folders(self, staging: str) -> List[FolderCandidate]: - """Find album folder candidates in staging directory.""" + """Find album folder and single file candidates in staging directory.""" candidates = [] try: entries = sorted(os.listdir(staging)) @@ -321,6 +325,16 @@ class AutoImportWorker: for entry in entries: full_path = os.path.join(staging, entry) + + # Loose audio file in staging root → single track candidate + if os.path.isfile(full_path) and os.path.splitext(entry)[1].lower() in AUDIO_EXTENSIONS: + folder_hash = _compute_folder_hash([full_path]) + candidates.append(FolderCandidate( + path=full_path, name=entry, audio_files=[full_path], + folder_hash=folder_hash, is_single=True + )) + continue + if not os.path.isdir(full_path): continue @@ -395,7 +409,10 @@ class AutoImportWorker: # ── Identification ── def _identify_folder(self, candidate: FolderCandidate) -> Optional[Dict]: - """Identify what album a folder contains. Returns identification dict or None.""" + """Identify what album/track a folder or single file contains.""" + + if candidate.is_single: + return self._identify_single(candidate) # Strategy 1: Read tags tag_result = self._identify_from_tags(candidate) @@ -414,6 +431,141 @@ class AutoImportWorker: return None + def _identify_single(self, candidate: FolderCandidate) -> Optional[Dict]: + """Identify a single audio file from tags, filename, or AcoustID.""" + file_path = candidate.audio_files[0] + tags = _read_file_tags(file_path) + + artist = tags.get('artist', '') + title = tags.get('title', '') + album = tags.get('album', '') + + # Fallback: parse filename (Artist - Title.ext) + if not artist or not title: + basename = os.path.splitext(os.path.basename(file_path))[0] + parts = re.split(r'\s*[-–—]\s*', basename, maxsplit=1) + if len(parts) == 2: + artist = artist or parts[0].strip() + title = title or parts[1].strip() + elif not title: + title = basename.strip() + + if not title: + return None + + # Search metadata source for track + result = self._search_single_track(artist, title, album) + if result: + return result + + # Fallback: AcoustID fingerprint + try: + from core.acoustid_client import AcoustIDClient + client = AcoustIDClient() + fp_result = client.fingerprint_and_lookup(file_path) + if fp_result and fp_result.get('recordings'): + best = fp_result['recordings'][0] + fp_artist = best.get('artist', artist) + fp_title = best.get('title', title) + if fp_artist and fp_title: + result = self._search_single_track(fp_artist, fp_title, '') + if result: + result['method'] = 'acoustid' + return result + except Exception: + pass + + # If we have artist and title but no metadata match, still return basic identification + if artist and title: + return { + 'album_id': None, + 'album_name': album or title, # Use title as album name for singles + 'artist_name': artist, + 'track_name': title, + 'image_url': '', + 'total_tracks': 1, + 'source': 'tags', + 'method': 'tags' if tags.get('artist') else 'filename', + 'identification_confidence': 0.7 if tags.get('artist') else 0.5, + 'is_single': True, + } + + return None + + def _search_single_track(self, artist: str, title: str, album: str) -> Optional[Dict]: + """Search metadata source for a single track match.""" + try: + from core.metadata_service import get_primary_source, get_client_for_source + + source = get_primary_source() + client = get_client_for_source(source) + if not client or not hasattr(client, 'search_tracks'): + return None + + query = f"{artist} {title}" if artist else title + results = client.search_tracks(query, limit=5) + if not results: + return None + + # Score results + best_result = None + best_score = 0 + + for r in results: + r_title = getattr(r, 'name', '') or getattr(r, 'title', '') or '' + r_artists = getattr(r, 'artists', []) + r_artist = '' + if r_artists: + a = r_artists[0] + r_artist = a.get('name', str(a)) if isinstance(a, dict) else str(a) + + score = _similarity(title, r_title) * 0.6 + if artist: + score += _similarity(artist, r_artist) * 0.4 + + if score > best_score: + best_score = score + best_result = r + + if not best_result or best_score < 0.5: + return None + + r_artist = '' + r_album = '' + r_album_id = '' + r_image = '' + if hasattr(best_result, 'artists') and best_result.artists: + a = best_result.artists[0] + r_artist = a.get('name', str(a)) if isinstance(a, dict) else str(a) + if hasattr(best_result, 'album'): + alb = best_result.album + if isinstance(alb, dict): + r_album = alb.get('name', '') + r_album_id = alb.get('id', '') + images = alb.get('images', []) + if images: + r_image = images[0].get('url', '') if isinstance(images[0], dict) else str(images[0]) + elif isinstance(alb, str): + r_album = alb + + return { + 'album_id': r_album_id or None, + 'album_name': r_album or title, + 'artist_name': r_artist or artist or '', + 'track_name': getattr(best_result, 'name', '') or title, + 'track_id': getattr(best_result, 'id', ''), + 'image_url': r_image, + 'total_tracks': 1, + 'source': source, + 'method': 'tags', + 'identification_confidence': best_score, + 'is_single': True, + } + + except Exception as e: + logger.debug(f"Single track search failed for '{artist} - {title}': {e}") + return None + def _identify_from_tags(self, candidate: FolderCandidate) -> Optional[Dict]: """Try to identify album from embedded file tags.""" tags_list = [] @@ -556,6 +708,27 @@ class AutoImportWorker: def _match_tracks(self, candidate: FolderCandidate, identification: Dict) -> Optional[Dict]: """Match staging files to the identified album's tracklist.""" + # Singles: no album tracklist to match against — the file IS the match + if candidate.is_single or identification.get('is_single'): + conf = identification.get('identification_confidence', 0.7) + track_data = { + 'name': identification.get('track_name', identification.get('album_name', '')), + 'artists': [{'name': identification.get('artist_name', '')}], + 'id': identification.get('track_id', ''), + 'track_number': 1, + 'disc_number': 1, + } + return { + 'matches': [{'track': track_data, 'file': candidate.audio_files[0], 'confidence': conf}], + 'unmatched_files': [], + 'total_tracks': 1, + 'matched_count': 1, + 'coverage': 1.0, + 'confidence': conf, + 'album_data': {'id': identification.get('album_id') or '', 'name': identification.get('album_name', ''), + 'tracks': {'items': [track_data]}}, + } + try: from core.metadata_service import get_client_for_source, get_album_tracks_for_source @@ -727,7 +900,7 @@ class AutoImportWorker: context_key = f"auto_import_{candidate.folder_hash}_{track_number}" context = { 'spotify_artist': { - 'id': identification.get('album_id', 'auto_import'), + 'id': identification.get('album_id') or 'auto_import', 'name': artist_name, 'genres': [], },