diff --git a/core/imports/routes.py b/core/imports/routes.py new file mode 100644 index 00000000..ec274000 --- /dev/null +++ b/core/imports/routes.py @@ -0,0 +1,186 @@ +"""Import/staging controller helpers for Flask-style endpoints.""" + +from __future__ import annotations + +import os +from dataclasses import dataclass +from typing import Any, Callable, Dict + +from core.imports.staging import ( + AUDIO_EXTENSIONS, + get_import_suggestions_cache, + get_staging_path as _get_staging_path, + read_staging_file_metadata as _read_staging_file_metadata, +) +from utils.logging_config import get_logger + + +module_logger = get_logger("imports.routes") + + +def _default_read_tags(file_path: str): + from mutagen import File as MutagenFile + + return MutagenFile(file_path, easy=True) + + +@dataclass +class ImportRouteRuntime: + """Dependencies needed to service import/staging HTTP endpoints.""" + + get_staging_path: Callable[[], str] = _get_staging_path + read_staging_file_metadata: Callable[[str, str], Dict[str, Any]] = _read_staging_file_metadata + read_tags: Callable[[str], Any] = _default_read_tags + logger: Any = module_logger + + +def staging_files(runtime: ImportRouteRuntime) -> tuple[Dict[str, Any], int]: + """Scan the staging folder and return audio files with tag metadata.""" + try: + staging_path = runtime.get_staging_path() + os.makedirs(staging_path, exist_ok=True) + + files = [] + for root, _dirs, filenames in os.walk(staging_path): + for fname in filenames: + ext = os.path.splitext(fname)[1].lower() + if ext not in AUDIO_EXTENSIONS: + continue + full_path = os.path.join(root, fname) + rel_path = os.path.relpath(full_path, staging_path) + + meta = runtime.read_staging_file_metadata(full_path, rel_path) + + files.append( + { + "filename": fname, + "rel_path": rel_path, + "full_path": full_path, + "title": meta["title"], + "artist": meta["albumartist"] or meta["artist"] or "Unknown Artist", + "album": meta["album"], + "track_number": meta["track_number"], + "disc_number": meta["disc_number"], + "extension": ext, + } + ) + + files.sort(key=lambda f: f["filename"].lower()) + return {"success": True, "files": files, "staging_path": staging_path}, 200 + except Exception as exc: + runtime.logger.error("Error scanning staging files: %s", exc) + return {"success": False, "error": str(exc)}, 500 + + +def staging_groups(runtime: ImportRouteRuntime) -> tuple[Dict[str, Any], int]: + """Auto-detect album groups from staging files based on their tags.""" + try: + staging_path = runtime.get_staging_path() + if not os.path.isdir(staging_path): + return {"success": True, "groups": []}, 200 + + album_groups = {} + for root, _dirs, filenames in os.walk(staging_path): + for fname in filenames: + ext = os.path.splitext(fname)[1].lower() + if ext not in AUDIO_EXTENSIONS: + continue + full_path = os.path.join(root, fname) + rel_path = os.path.relpath(full_path, staging_path) + + meta = runtime.read_staging_file_metadata(full_path, rel_path) + album = meta["album"] + artist = meta["albumartist"] or meta["artist"] + if not album or not artist: + continue + + key = (album.lower().strip(), artist.lower().strip()) + if key not in album_groups: + album_groups[key] = {"album": album.strip(), "artist": artist.strip(), "files": []} + album_groups[key]["files"].append( + { + "filename": fname, + "full_path": full_path, + "title": meta["title"], + "track_number": meta["track_number"], + } + ) + + groups = [] + for group in album_groups.values(): + if len(group["files"]) >= 2: + group["files"].sort(key=lambda f: f.get("track_number") or 999) + groups.append( + { + "album": group["album"], + "artist": group["artist"], + "file_count": len(group["files"]), + "files": group["files"], + "file_paths": [f["full_path"] for f in group["files"]], + } + ) + + groups.sort(key=lambda g: g["file_count"], reverse=True) + return {"success": True, "groups": groups}, 200 + except Exception as exc: + runtime.logger.error("Error building staging groups: %s", exc) + return {"success": False, "error": str(exc)}, 500 + + +def staging_hints(runtime: ImportRouteRuntime) -> tuple[Dict[str, Any], int]: + """Extract album search hints from staging folder tags and folder names.""" + try: + staging_path = runtime.get_staging_path() + if not os.path.isdir(staging_path): + return {"success": True, "hints": []}, 200 + + tag_albums = {} + folder_hints = {} + for root, _dirs, filenames in os.walk(staging_path): + audio_files = [f for f in filenames if os.path.splitext(f)[1].lower() in AUDIO_EXTENSIONS] + if not audio_files: + continue + + rel_dir = os.path.relpath(root, staging_path) + if rel_dir != ".": + top_folder = rel_dir.split(os.sep)[0] + folder_hints[top_folder] = folder_hints.get(top_folder, 0) + len(audio_files) + + for fname in audio_files: + full_path = os.path.join(root, fname) + try: + tags = runtime.read_tags(full_path) + if tags: + album = (tags.get("album") or [None])[0] + artist = (tags.get("artist") or (tags.get("albumartist") or [None]))[0] + if album: + key = (album.strip(), (artist or "").strip()) + tag_albums[key] = tag_albums.get(key, 0) + 1 + except Exception as exc: + runtime.logger.debug("tag read failed: %s", exc) + + queries = [] + seen_queries_lower = set() + + for (album, artist), _count in sorted(tag_albums.items(), key=lambda x: -x[1]): + query = f"{album} {artist}".strip() if artist else album + if query.lower() not in seen_queries_lower: + seen_queries_lower.add(query.lower()) + queries.append(query) + + for folder, _count in sorted(folder_hints.items(), key=lambda x: -x[1]): + query = folder.replace("_", " ") + if query.lower() not in seen_queries_lower: + seen_queries_lower.add(query.lower()) + queries.append(query) + + return {"success": True, "hints": queries[:5]}, 200 + except Exception as exc: + runtime.logger.error("Error getting staging hints: %s", exc) + return {"success": False, "error": str(exc)}, 500 + + +def staging_suggestions() -> tuple[Dict[str, Any], int]: + """Return cached import suggestions and readiness state.""" + cache = get_import_suggestions_cache() + return {"success": True, "suggestions": cache["suggestions"], "ready": cache["built"]}, 200 diff --git a/tests/imports/test_import_routes.py b/tests/imports/test_import_routes.py new file mode 100644 index 00000000..de8c9a8d --- /dev/null +++ b/tests/imports/test_import_routes.py @@ -0,0 +1,230 @@ +import os + +import core.imports.routes as import_routes +from core.imports.routes import ImportRouteRuntime, staging_files, staging_groups, staging_hints, staging_suggestions + + +class _FakeLogger: + def __init__(self): + self.debug_messages = [] + self.error_messages = [] + + def debug(self, msg, *args): + self.debug_messages.append(msg % args if args else msg) + + def error(self, msg, *args): + self.error_messages.append(msg % args if args else msg) + + +def _touch(path): + path.parent.mkdir(parents=True, exist_ok=True) + path.write_bytes(b"") + + +def _metadata_for(files): + def _read_metadata(file_path, rel_path): + return files[rel_path] + + return _read_metadata + + +def test_staging_files_returns_audio_files_with_metadata(tmp_path): + _touch(tmp_path / "Artist" / "02 - Song.flac") + _touch(tmp_path / "cover.jpg") + rel_song = os.path.join("Artist", "02 - Song.flac") + metadata = { + rel_song: { + "title": "Song", + "artist": "Track Artist", + "albumartist": "Album Artist", + "album": "Album", + "track_number": 2, + "disc_number": 1, + } + } + runtime = ImportRouteRuntime( + get_staging_path=lambda: str(tmp_path), + read_staging_file_metadata=_metadata_for(metadata), + logger=_FakeLogger(), + ) + + payload, status = staging_files(runtime) + + assert status == 200 + assert payload["success"] is True + assert payload["staging_path"] == str(tmp_path) + assert len(payload["files"]) == 1 + assert payload["files"] == [ + { + "filename": "02 - Song.flac", + "rel_path": rel_song, + "full_path": str(tmp_path / "Artist" / "02 - Song.flac"), + "title": "Song", + "artist": "Album Artist", + "album": "Album", + "track_number": 2, + "disc_number": 1, + "extension": ".flac", + } + ] + + +def test_staging_groups_only_returns_multi_file_album_groups(tmp_path): + _touch(tmp_path / "a.mp3") + _touch(tmp_path / "b.mp3") + _touch(tmp_path / "single.mp3") + metadata = { + "a.mp3": { + "title": "A", + "artist": "Artist", + "albumartist": "", + "album": "Album", + "track_number": 2, + "disc_number": 1, + }, + "b.mp3": { + "title": "B", + "artist": "Artist", + "albumartist": "", + "album": "Album", + "track_number": 1, + "disc_number": 1, + }, + "single.mp3": { + "title": "Single", + "artist": "Other", + "albumartist": "", + "album": "Other Album", + "track_number": 1, + "disc_number": 1, + }, + } + runtime = ImportRouteRuntime( + get_staging_path=lambda: str(tmp_path), + read_staging_file_metadata=_metadata_for(metadata), + logger=_FakeLogger(), + ) + + payload, status = staging_groups(runtime) + + assert status == 200 + assert payload["success"] is True + assert len(payload["groups"]) == 1 + group = payload["groups"][0] + assert group["album"] == "Album" + assert group["artist"] == "Artist" + assert group["file_count"] == 2 + assert [f["filename"] for f in group["files"]] == ["b.mp3", "a.mp3"] + + +def test_staging_hints_prefers_tag_queries_then_folder_queries(tmp_path): + _touch(tmp_path / "Folder_Album" / "01.mp3") + _touch(tmp_path / "Folder_Album" / "02.mp3") + _touch(tmp_path / "Loose" / "track.flac") + + def _read_tags(file_path): + if file_path.endswith("01.mp3") or file_path.endswith("02.mp3"): + return {"album": ["Tagged Album"], "artist": ["Tagged Artist"]} + return {} + + runtime = ImportRouteRuntime( + get_staging_path=lambda: str(tmp_path), + read_tags=_read_tags, + logger=_FakeLogger(), + ) + + payload, status = staging_hints(runtime) + + assert status == 200 + assert payload == { + "success": True, + "hints": ["Tagged Album Tagged Artist", "Folder Album", "Loose"], + } + + +def test_staging_suggestions_returns_cache_payload(monkeypatch): + monkeypatch.setattr( + import_routes, + "get_import_suggestions_cache", + lambda: {"suggestions": [{"album": "Album"}], "built": True}, + ) + + payload, status = staging_suggestions() + + assert status == 200 + assert payload == { + "success": True, + "suggestions": [{"album": "Album"}], + "ready": True, + } + + +def test_staging_groups_returns_empty_for_missing_staging_path(tmp_path): + runtime = ImportRouteRuntime( + get_staging_path=lambda: str(tmp_path / "missing"), + logger=_FakeLogger(), + ) + + payload, status = staging_groups(runtime) + + assert status == 200 + assert payload == {"success": True, "groups": []} + + +def test_staging_hints_returns_empty_for_missing_staging_path(tmp_path): + runtime = ImportRouteRuntime( + get_staging_path=lambda: str(tmp_path / "missing"), + logger=_FakeLogger(), + ) + + payload, status = staging_hints(runtime) + + assert status == 200 + assert payload == {"success": True, "hints": []} + + +def test_staging_files_returns_error_when_path_resolution_fails(): + logger = _FakeLogger() + runtime = ImportRouteRuntime( + get_staging_path=lambda: (_ for _ in ()).throw(RuntimeError("path boom")), + logger=logger, + ) + + payload, status = staging_files(runtime) + + assert status == 500 + assert payload["success"] is False + assert payload["error"] == "path boom" + assert logger.error_messages == ["Error scanning staging files: path boom"] + + +def test_staging_groups_returns_error_when_metadata_read_fails(tmp_path): + _touch(tmp_path / "a.mp3") + logger = _FakeLogger() + runtime = ImportRouteRuntime( + get_staging_path=lambda: str(tmp_path), + read_staging_file_metadata=lambda _file_path, _rel_path: (_ for _ in ()).throw(RuntimeError("tag boom")), + logger=logger, + ) + + payload, status = staging_groups(runtime) + + assert status == 500 + assert payload["success"] is False + assert payload["error"] == "tag boom" + assert logger.error_messages == ["Error building staging groups: tag boom"] + + +def test_staging_hints_returns_error_when_path_resolution_fails(): + logger = _FakeLogger() + runtime = ImportRouteRuntime( + get_staging_path=lambda: (_ for _ in ()).throw(RuntimeError("hint boom")), + logger=logger, + ) + + payload, status = staging_hints(runtime) + + assert status == 500 + assert payload["success"] is False + assert payload["error"] == "hint boom" + assert logger.error_messages == ["Error getting staging hints: hint boom"] diff --git a/web_server.py b/web_server.py index db0d01f9..989b813a 100644 --- a/web_server.py +++ b/web_server.py @@ -169,7 +169,6 @@ from core.imports.album import ( from core.imports.album_naming import resolve_album_group as _resolve_album_group from core.imports.filename import extract_track_number_from_filename, parse_filename_metadata from core.imports.staging import ( - get_import_suggestions_cache, get_primary_source, get_staging_path, read_staging_file_metadata, @@ -178,6 +177,11 @@ from core.imports.staging import ( search_import_tracks, start_import_suggestions_cache, ) +from core.imports.routes import ImportRouteRuntime as _ImportRouteRuntime +from core.imports.routes import staging_files as _import_staging_files +from core.imports.routes import staging_groups as _import_staging_groups +from core.imports.routes import staging_hints as _import_staging_hints +from core.imports.routes import staging_suggestions as _import_staging_suggestions from core.imports.paths import build_final_path_for_track as _build_final_path_for_track from core.imports.pipeline import build_import_pipeline_runtime as _build_import_pipeline_runtime from core.metadata.common import get_file_lock @@ -34264,174 +34268,26 @@ def repair_job_progress(): # IMPORT / STAGING SYSTEM # ================================================================================================ -AUDIO_EXTENSIONS = {'.mp3', '.flac', '.ogg', '.opus', '.m4a', '.aac', '.wav', '.wma', '.aiff', '.aif', '.ape'} +def _build_import_route_runtime(): + return _ImportRouteRuntime(logger=logger) + @app.route('/api/import/staging/files', methods=['GET']) def import_staging_files(): - """Scan the staging folder and return audio files with tag metadata.""" - try: - staging_path = get_staging_path() - os.makedirs(staging_path, exist_ok=True) - - files = [] - for root, _dirs, filenames in os.walk(staging_path): - for fname in filenames: - ext = os.path.splitext(fname)[1].lower() - if ext not in AUDIO_EXTENSIONS: - continue - full_path = os.path.join(root, fname) - rel_path = os.path.relpath(full_path, staging_path) - - meta = read_staging_file_metadata(full_path, rel_path) - - files.append({ - 'filename': fname, - 'rel_path': rel_path, - 'full_path': full_path, - 'title': meta['title'], - 'artist': meta['albumartist'] or meta['artist'] or 'Unknown Artist', - 'album': meta['album'], - 'track_number': meta['track_number'], - 'disc_number': meta['disc_number'], - 'extension': ext - }) - - # Sort by filename - files.sort(key=lambda f: f['filename'].lower()) - return jsonify({'success': True, 'files': files, 'staging_path': staging_path}) - except Exception as e: - logger.error(f"Error scanning staging files: {e}") - return jsonify({'success': False, 'error': str(e)}), 500 + payload, status = _import_staging_files(_build_import_route_runtime()) + return jsonify(payload), status @app.route('/api/import/staging/groups', methods=['GET']) def import_staging_groups(): - """Auto-detect album groups from staging files based on their tags. - - Groups files by (album_tag, artist) where both are non-empty and at least 2 files share - the same album+artist combo. Returns groups sorted by file count descending. - """ - try: - staging_path = get_staging_path() - if not os.path.isdir(staging_path): - return jsonify({'success': True, 'groups': []}) - - # Scan files and group by album+artist tags - album_groups = {} # (album_lower, artist_lower) -> {album, artist, files: []} - for root, _dirs, filenames in os.walk(staging_path): - for fname in filenames: - ext = os.path.splitext(fname)[1].lower() - if ext not in AUDIO_EXTENSIONS: - continue - full_path = os.path.join(root, fname) - rel_path = os.path.relpath(full_path, staging_path) - - meta = read_staging_file_metadata(full_path, rel_path) - album = meta['album'] - artist = meta['albumartist'] or meta['artist'] - if not album or not artist: - continue - - key = (album.lower().strip(), artist.lower().strip()) - if key not in album_groups: - album_groups[key] = { - 'album': album.strip(), - 'artist': artist.strip(), - 'files': [] - } - album_groups[key]['files'].append({ - 'filename': fname, - 'full_path': full_path, - 'title': meta['title'], - 'track_number': meta['track_number'], - }) - - # Only return groups with 2+ files - groups = [] - for group in album_groups.values(): - if len(group['files']) >= 2: - group['files'].sort(key=lambda f: f.get('track_number') or 999) - groups.append({ - 'album': group['album'], - 'artist': group['artist'], - 'file_count': len(group['files']), - 'files': group['files'], - 'file_paths': [f['full_path'] for f in group['files']], - }) - - # Sort by file count descending - groups.sort(key=lambda g: g['file_count'], reverse=True) - - return jsonify({'success': True, 'groups': groups}) - except Exception as e: - logger.error(f"Error building staging groups: {e}") - return jsonify({'success': False, 'error': str(e)}), 500 + payload, status = _import_staging_groups(_build_import_route_runtime()) + return jsonify(payload), status @app.route('/api/import/staging/hints', methods=['GET']) def import_staging_hints(): - """Extract album search hints from staging folder (tags + folder names). Fast — no Spotify calls.""" - try: - staging_path = get_staging_path() - if not os.path.isdir(staging_path): - return jsonify({'success': True, 'hints': []}) - - # Collect hints from tags and folder structure - tag_albums = {} # (album, artist) -> file count - folder_hints = {} # subfolder name -> file count - - for root, _dirs, filenames in os.walk(staging_path): - audio_files = [f for f in filenames if os.path.splitext(f)[1].lower() in AUDIO_EXTENSIONS] - if not audio_files: - continue - - # Folder-based hint: use immediate subfolder name relative to staging - rel_dir = os.path.relpath(root, staging_path) - if rel_dir != '.': - # Use the top-level subfolder as the hint - top_folder = rel_dir.split(os.sep)[0] - folder_hints[top_folder] = folder_hints.get(top_folder, 0) + len(audio_files) - - # Tag-based hints - for fname in audio_files: - full_path = os.path.join(root, fname) - try: - from mutagen import File as MutagenFile - tags = MutagenFile(full_path, easy=True) - if tags: - album = (tags.get('album') or [None])[0] - artist = (tags.get('artist') or (tags.get('albumartist') or [None]))[0] - if album: - key = (album.strip(), (artist or '').strip()) - tag_albums[key] = tag_albums.get(key, 0) + 1 - except Exception as e: - logger.debug("tag read failed: %s", e) - - # Build search queries, prioritizing tag-based hints (more specific) - queries = [] - seen_queries_lower = set() - - # Tag-based: sort by file count descending - for (album, artist), _count in sorted(tag_albums.items(), key=lambda x: -x[1]): - q = f"{album} {artist}".strip() if artist else album - if q.lower() not in seen_queries_lower: - seen_queries_lower.add(q.lower()) - queries.append(q) - - # Folder-based: parse "Artist - Album" pattern or use as-is - for folder, _count in sorted(folder_hints.items(), key=lambda x: -x[1]): - q = folder.replace('_', ' ') - if q.lower() not in seen_queries_lower: - seen_queries_lower.add(q.lower()) - queries.append(q) - - # Cap at 5 queries to keep it fast - queries = queries[:5] - - return jsonify({'success': True, 'hints': queries}) - except Exception as e: - logger.error(f"Error getting staging hints: {e}") - return jsonify({'success': False, 'error': str(e)}), 500 + payload, status = _import_staging_hints(_build_import_route_runtime()) + return jsonify(payload), status @app.route('/api/import/search/albums', methods=['GET']) @@ -34942,13 +34798,8 @@ def auto_import_clear_completed(): @app.route('/api/import/staging/suggestions', methods=['GET']) def import_staging_suggestions(): - """Return cached import suggestions. If cache isn't built yet, returns partial/empty with a flag.""" - cache = get_import_suggestions_cache() - return jsonify({ - 'success': True, - 'suggestions': cache['suggestions'], - 'ready': cache['built'], - }) + payload, status = _import_staging_suggestions() + return jsonify(payload), status # ================================================================================================