From d703d331785c9c51c0f6ab26a2dcb3ac2aaca22f Mon Sep 17 00:00:00 2001 From: Broque Thomas <26755000+Nezreka@users.noreply.github.com> Date: Wed, 13 May 2026 19:50:13 -0700 Subject: [PATCH] Extract import staging route helpers Move import staging files/groups/hints/suggestions controller logic out of web_server.py and into core.imports.routes behind an ImportRouteRuntime dependency object. Keep the existing Flask routes as thin compatibility wrappers so the UI endpoint surface stays unchanged. Add focused tests for staging file filtering, album grouping, hint generation, cached suggestions, empty missing staging paths, and error payloads from failed path/metadata reads. Verification: py_compile passed for web_server.py, core/imports/routes.py, and tests/imports/test_import_routes.py. A bundled-Python smoke pass covered the extracted helper behavior; pytest was not available in this Windows shell because the bundled Python lacks pytest and the repo venv is WSL/Linux-only here. --- core/imports/routes.py | 186 ++++++++++++++++++++++ tests/imports/test_import_routes.py | 230 ++++++++++++++++++++++++++++ web_server.py | 181 ++-------------------- 3 files changed, 432 insertions(+), 165 deletions(-) create mode 100644 core/imports/routes.py create mode 100644 tests/imports/test_import_routes.py diff --git a/core/imports/routes.py b/core/imports/routes.py new file mode 100644 index 00000000..ec274000 --- /dev/null +++ b/core/imports/routes.py @@ -0,0 +1,186 @@ +"""Import/staging controller helpers for Flask-style endpoints.""" + +from __future__ import annotations + +import os +from dataclasses import dataclass +from typing import Any, Callable, Dict + +from core.imports.staging import ( + AUDIO_EXTENSIONS, + get_import_suggestions_cache, + get_staging_path as _get_staging_path, + read_staging_file_metadata as _read_staging_file_metadata, +) +from utils.logging_config import get_logger + + +module_logger = get_logger("imports.routes") + + +def _default_read_tags(file_path: str): + from mutagen import File as MutagenFile + + return MutagenFile(file_path, easy=True) + + +@dataclass +class ImportRouteRuntime: + """Dependencies needed to service import/staging HTTP endpoints.""" + + get_staging_path: Callable[[], str] = _get_staging_path + read_staging_file_metadata: Callable[[str, str], Dict[str, Any]] = _read_staging_file_metadata + read_tags: Callable[[str], Any] = _default_read_tags + logger: Any = module_logger + + +def staging_files(runtime: ImportRouteRuntime) -> tuple[Dict[str, Any], int]: + """Scan the staging folder and return audio files with tag metadata.""" + try: + staging_path = runtime.get_staging_path() + os.makedirs(staging_path, exist_ok=True) + + files = [] + for root, _dirs, filenames in os.walk(staging_path): + for fname in filenames: + ext = os.path.splitext(fname)[1].lower() + if ext not in AUDIO_EXTENSIONS: + continue + full_path = os.path.join(root, fname) + rel_path = os.path.relpath(full_path, staging_path) + + meta = runtime.read_staging_file_metadata(full_path, rel_path) + + files.append( + { + "filename": fname, + "rel_path": rel_path, + "full_path": full_path, + "title": meta["title"], + "artist": meta["albumartist"] or meta["artist"] or "Unknown Artist", + "album": meta["album"], + "track_number": meta["track_number"], + "disc_number": meta["disc_number"], + "extension": ext, + } + ) + + files.sort(key=lambda f: f["filename"].lower()) + return {"success": True, "files": files, "staging_path": staging_path}, 200 + except Exception as exc: + runtime.logger.error("Error scanning staging files: %s", exc) + return {"success": False, "error": str(exc)}, 500 + + +def staging_groups(runtime: ImportRouteRuntime) -> tuple[Dict[str, Any], int]: + """Auto-detect album groups from staging files based on their tags.""" + try: + staging_path = runtime.get_staging_path() + if not os.path.isdir(staging_path): + return {"success": True, "groups": []}, 200 + + album_groups = {} + for root, _dirs, filenames in os.walk(staging_path): + for fname in filenames: + ext = os.path.splitext(fname)[1].lower() + if ext not in AUDIO_EXTENSIONS: + continue + full_path = os.path.join(root, fname) + rel_path = os.path.relpath(full_path, staging_path) + + meta = runtime.read_staging_file_metadata(full_path, rel_path) + album = meta["album"] + artist = meta["albumartist"] or meta["artist"] + if not album or not artist: + continue + + key = (album.lower().strip(), artist.lower().strip()) + if key not in album_groups: + album_groups[key] = {"album": album.strip(), "artist": artist.strip(), "files": []} + album_groups[key]["files"].append( + { + "filename": fname, + "full_path": full_path, + "title": meta["title"], + "track_number": meta["track_number"], + } + ) + + groups = [] + for group in album_groups.values(): + if len(group["files"]) >= 2: + group["files"].sort(key=lambda f: f.get("track_number") or 999) + groups.append( + { + "album": group["album"], + "artist": group["artist"], + "file_count": len(group["files"]), + "files": group["files"], + "file_paths": [f["full_path"] for f in group["files"]], + } + ) + + groups.sort(key=lambda g: g["file_count"], reverse=True) + return {"success": True, "groups": groups}, 200 + except Exception as exc: + runtime.logger.error("Error building staging groups: %s", exc) + return {"success": False, "error": str(exc)}, 500 + + +def staging_hints(runtime: ImportRouteRuntime) -> tuple[Dict[str, Any], int]: + """Extract album search hints from staging folder tags and folder names.""" + try: + staging_path = runtime.get_staging_path() + if not os.path.isdir(staging_path): + return {"success": True, "hints": []}, 200 + + tag_albums = {} + folder_hints = {} + for root, _dirs, filenames in os.walk(staging_path): + audio_files = [f for f in filenames if os.path.splitext(f)[1].lower() in AUDIO_EXTENSIONS] + if not audio_files: + continue + + rel_dir = os.path.relpath(root, staging_path) + if rel_dir != ".": + top_folder = rel_dir.split(os.sep)[0] + folder_hints[top_folder] = folder_hints.get(top_folder, 0) + len(audio_files) + + for fname in audio_files: + full_path = os.path.join(root, fname) + try: + tags = runtime.read_tags(full_path) + if tags: + album = (tags.get("album") or [None])[0] + artist = (tags.get("artist") or (tags.get("albumartist") or [None]))[0] + if album: + key = (album.strip(), (artist or "").strip()) + tag_albums[key] = tag_albums.get(key, 0) + 1 + except Exception as exc: + runtime.logger.debug("tag read failed: %s", exc) + + queries = [] + seen_queries_lower = set() + + for (album, artist), _count in sorted(tag_albums.items(), key=lambda x: -x[1]): + query = f"{album} {artist}".strip() if artist else album + if query.lower() not in seen_queries_lower: + seen_queries_lower.add(query.lower()) + queries.append(query) + + for folder, _count in sorted(folder_hints.items(), key=lambda x: -x[1]): + query = folder.replace("_", " ") + if query.lower() not in seen_queries_lower: + seen_queries_lower.add(query.lower()) + queries.append(query) + + return {"success": True, "hints": queries[:5]}, 200 + except Exception as exc: + runtime.logger.error("Error getting staging hints: %s", exc) + return {"success": False, "error": str(exc)}, 500 + + +def staging_suggestions() -> tuple[Dict[str, Any], int]: + """Return cached import suggestions and readiness state.""" + cache = get_import_suggestions_cache() + return {"success": True, "suggestions": cache["suggestions"], "ready": cache["built"]}, 200 diff --git a/tests/imports/test_import_routes.py b/tests/imports/test_import_routes.py new file mode 100644 index 00000000..de8c9a8d --- /dev/null +++ b/tests/imports/test_import_routes.py @@ -0,0 +1,230 @@ +import os + +import core.imports.routes as import_routes +from core.imports.routes import ImportRouteRuntime, staging_files, staging_groups, staging_hints, staging_suggestions + + +class _FakeLogger: + def __init__(self): + self.debug_messages = [] + self.error_messages = [] + + def debug(self, msg, *args): + self.debug_messages.append(msg % args if args else msg) + + def error(self, msg, *args): + self.error_messages.append(msg % args if args else msg) + + +def _touch(path): + path.parent.mkdir(parents=True, exist_ok=True) + path.write_bytes(b"") + + +def _metadata_for(files): + def _read_metadata(file_path, rel_path): + return files[rel_path] + + return _read_metadata + + +def test_staging_files_returns_audio_files_with_metadata(tmp_path): + _touch(tmp_path / "Artist" / "02 - Song.flac") + _touch(tmp_path / "cover.jpg") + rel_song = os.path.join("Artist", "02 - Song.flac") + metadata = { + rel_song: { + "title": "Song", + "artist": "Track Artist", + "albumartist": "Album Artist", + "album": "Album", + "track_number": 2, + "disc_number": 1, + } + } + runtime = ImportRouteRuntime( + get_staging_path=lambda: str(tmp_path), + read_staging_file_metadata=_metadata_for(metadata), + logger=_FakeLogger(), + ) + + payload, status = staging_files(runtime) + + assert status == 200 + assert payload["success"] is True + assert payload["staging_path"] == str(tmp_path) + assert len(payload["files"]) == 1 + assert payload["files"] == [ + { + "filename": "02 - Song.flac", + "rel_path": rel_song, + "full_path": str(tmp_path / "Artist" / "02 - Song.flac"), + "title": "Song", + "artist": "Album Artist", + "album": "Album", + "track_number": 2, + "disc_number": 1, + "extension": ".flac", + } + ] + + +def test_staging_groups_only_returns_multi_file_album_groups(tmp_path): + _touch(tmp_path / "a.mp3") + _touch(tmp_path / "b.mp3") + _touch(tmp_path / "single.mp3") + metadata = { + "a.mp3": { + "title": "A", + "artist": "Artist", + "albumartist": "", + "album": "Album", + "track_number": 2, + "disc_number": 1, + }, + "b.mp3": { + "title": "B", + "artist": "Artist", + "albumartist": "", + "album": "Album", + "track_number": 1, + "disc_number": 1, + }, + "single.mp3": { + "title": "Single", + "artist": "Other", + "albumartist": "", + "album": "Other Album", + "track_number": 1, + "disc_number": 1, + }, + } + runtime = ImportRouteRuntime( + get_staging_path=lambda: str(tmp_path), + read_staging_file_metadata=_metadata_for(metadata), + logger=_FakeLogger(), + ) + + payload, status = staging_groups(runtime) + + assert status == 200 + assert payload["success"] is True + assert len(payload["groups"]) == 1 + group = payload["groups"][0] + assert group["album"] == "Album" + assert group["artist"] == "Artist" + assert group["file_count"] == 2 + assert [f["filename"] for f in group["files"]] == ["b.mp3", "a.mp3"] + + +def test_staging_hints_prefers_tag_queries_then_folder_queries(tmp_path): + _touch(tmp_path / "Folder_Album" / "01.mp3") + _touch(tmp_path / "Folder_Album" / "02.mp3") + _touch(tmp_path / "Loose" / "track.flac") + + def _read_tags(file_path): + if file_path.endswith("01.mp3") or file_path.endswith("02.mp3"): + return {"album": ["Tagged Album"], "artist": ["Tagged Artist"]} + return {} + + runtime = ImportRouteRuntime( + get_staging_path=lambda: str(tmp_path), + read_tags=_read_tags, + logger=_FakeLogger(), + ) + + payload, status = staging_hints(runtime) + + assert status == 200 + assert payload == { + "success": True, + "hints": ["Tagged Album Tagged Artist", "Folder Album", "Loose"], + } + + +def test_staging_suggestions_returns_cache_payload(monkeypatch): + monkeypatch.setattr( + import_routes, + "get_import_suggestions_cache", + lambda: {"suggestions": [{"album": "Album"}], "built": True}, + ) + + payload, status = staging_suggestions() + + assert status == 200 + assert payload == { + "success": True, + "suggestions": [{"album": "Album"}], + "ready": True, + } + + +def test_staging_groups_returns_empty_for_missing_staging_path(tmp_path): + runtime = ImportRouteRuntime( + get_staging_path=lambda: str(tmp_path / "missing"), + logger=_FakeLogger(), + ) + + payload, status = staging_groups(runtime) + + assert status == 200 + assert payload == {"success": True, "groups": []} + + +def test_staging_hints_returns_empty_for_missing_staging_path(tmp_path): + runtime = ImportRouteRuntime( + get_staging_path=lambda: str(tmp_path / "missing"), + logger=_FakeLogger(), + ) + + payload, status = staging_hints(runtime) + + assert status == 200 + assert payload == {"success": True, "hints": []} + + +def test_staging_files_returns_error_when_path_resolution_fails(): + logger = _FakeLogger() + runtime = ImportRouteRuntime( + get_staging_path=lambda: (_ for _ in ()).throw(RuntimeError("path boom")), + logger=logger, + ) + + payload, status = staging_files(runtime) + + assert status == 500 + assert payload["success"] is False + assert payload["error"] == "path boom" + assert logger.error_messages == ["Error scanning staging files: path boom"] + + +def test_staging_groups_returns_error_when_metadata_read_fails(tmp_path): + _touch(tmp_path / "a.mp3") + logger = _FakeLogger() + runtime = ImportRouteRuntime( + get_staging_path=lambda: str(tmp_path), + read_staging_file_metadata=lambda _file_path, _rel_path: (_ for _ in ()).throw(RuntimeError("tag boom")), + logger=logger, + ) + + payload, status = staging_groups(runtime) + + assert status == 500 + assert payload["success"] is False + assert payload["error"] == "tag boom" + assert logger.error_messages == ["Error building staging groups: tag boom"] + + +def test_staging_hints_returns_error_when_path_resolution_fails(): + logger = _FakeLogger() + runtime = ImportRouteRuntime( + get_staging_path=lambda: (_ for _ in ()).throw(RuntimeError("hint boom")), + logger=logger, + ) + + payload, status = staging_hints(runtime) + + assert status == 500 + assert payload["success"] is False + assert payload["error"] == "hint boom" + assert logger.error_messages == ["Error getting staging hints: hint boom"] diff --git a/web_server.py b/web_server.py index db0d01f9..989b813a 100644 --- a/web_server.py +++ b/web_server.py @@ -169,7 +169,6 @@ from core.imports.album import ( from core.imports.album_naming import resolve_album_group as _resolve_album_group from core.imports.filename import extract_track_number_from_filename, parse_filename_metadata from core.imports.staging import ( - get_import_suggestions_cache, get_primary_source, get_staging_path, read_staging_file_metadata, @@ -178,6 +177,11 @@ from core.imports.staging import ( search_import_tracks, start_import_suggestions_cache, ) +from core.imports.routes import ImportRouteRuntime as _ImportRouteRuntime +from core.imports.routes import staging_files as _import_staging_files +from core.imports.routes import staging_groups as _import_staging_groups +from core.imports.routes import staging_hints as _import_staging_hints +from core.imports.routes import staging_suggestions as _import_staging_suggestions from core.imports.paths import build_final_path_for_track as _build_final_path_for_track from core.imports.pipeline import build_import_pipeline_runtime as _build_import_pipeline_runtime from core.metadata.common import get_file_lock @@ -34264,174 +34268,26 @@ def repair_job_progress(): # IMPORT / STAGING SYSTEM # ================================================================================================ -AUDIO_EXTENSIONS = {'.mp3', '.flac', '.ogg', '.opus', '.m4a', '.aac', '.wav', '.wma', '.aiff', '.aif', '.ape'} +def _build_import_route_runtime(): + return _ImportRouteRuntime(logger=logger) + @app.route('/api/import/staging/files', methods=['GET']) def import_staging_files(): - """Scan the staging folder and return audio files with tag metadata.""" - try: - staging_path = get_staging_path() - os.makedirs(staging_path, exist_ok=True) - - files = [] - for root, _dirs, filenames in os.walk(staging_path): - for fname in filenames: - ext = os.path.splitext(fname)[1].lower() - if ext not in AUDIO_EXTENSIONS: - continue - full_path = os.path.join(root, fname) - rel_path = os.path.relpath(full_path, staging_path) - - meta = read_staging_file_metadata(full_path, rel_path) - - files.append({ - 'filename': fname, - 'rel_path': rel_path, - 'full_path': full_path, - 'title': meta['title'], - 'artist': meta['albumartist'] or meta['artist'] or 'Unknown Artist', - 'album': meta['album'], - 'track_number': meta['track_number'], - 'disc_number': meta['disc_number'], - 'extension': ext - }) - - # Sort by filename - files.sort(key=lambda f: f['filename'].lower()) - return jsonify({'success': True, 'files': files, 'staging_path': staging_path}) - except Exception as e: - logger.error(f"Error scanning staging files: {e}") - return jsonify({'success': False, 'error': str(e)}), 500 + payload, status = _import_staging_files(_build_import_route_runtime()) + return jsonify(payload), status @app.route('/api/import/staging/groups', methods=['GET']) def import_staging_groups(): - """Auto-detect album groups from staging files based on their tags. - - Groups files by (album_tag, artist) where both are non-empty and at least 2 files share - the same album+artist combo. Returns groups sorted by file count descending. - """ - try: - staging_path = get_staging_path() - if not os.path.isdir(staging_path): - return jsonify({'success': True, 'groups': []}) - - # Scan files and group by album+artist tags - album_groups = {} # (album_lower, artist_lower) -> {album, artist, files: []} - for root, _dirs, filenames in os.walk(staging_path): - for fname in filenames: - ext = os.path.splitext(fname)[1].lower() - if ext not in AUDIO_EXTENSIONS: - continue - full_path = os.path.join(root, fname) - rel_path = os.path.relpath(full_path, staging_path) - - meta = read_staging_file_metadata(full_path, rel_path) - album = meta['album'] - artist = meta['albumartist'] or meta['artist'] - if not album or not artist: - continue - - key = (album.lower().strip(), artist.lower().strip()) - if key not in album_groups: - album_groups[key] = { - 'album': album.strip(), - 'artist': artist.strip(), - 'files': [] - } - album_groups[key]['files'].append({ - 'filename': fname, - 'full_path': full_path, - 'title': meta['title'], - 'track_number': meta['track_number'], - }) - - # Only return groups with 2+ files - groups = [] - for group in album_groups.values(): - if len(group['files']) >= 2: - group['files'].sort(key=lambda f: f.get('track_number') or 999) - groups.append({ - 'album': group['album'], - 'artist': group['artist'], - 'file_count': len(group['files']), - 'files': group['files'], - 'file_paths': [f['full_path'] for f in group['files']], - }) - - # Sort by file count descending - groups.sort(key=lambda g: g['file_count'], reverse=True) - - return jsonify({'success': True, 'groups': groups}) - except Exception as e: - logger.error(f"Error building staging groups: {e}") - return jsonify({'success': False, 'error': str(e)}), 500 + payload, status = _import_staging_groups(_build_import_route_runtime()) + return jsonify(payload), status @app.route('/api/import/staging/hints', methods=['GET']) def import_staging_hints(): - """Extract album search hints from staging folder (tags + folder names). Fast — no Spotify calls.""" - try: - staging_path = get_staging_path() - if not os.path.isdir(staging_path): - return jsonify({'success': True, 'hints': []}) - - # Collect hints from tags and folder structure - tag_albums = {} # (album, artist) -> file count - folder_hints = {} # subfolder name -> file count - - for root, _dirs, filenames in os.walk(staging_path): - audio_files = [f for f in filenames if os.path.splitext(f)[1].lower() in AUDIO_EXTENSIONS] - if not audio_files: - continue - - # Folder-based hint: use immediate subfolder name relative to staging - rel_dir = os.path.relpath(root, staging_path) - if rel_dir != '.': - # Use the top-level subfolder as the hint - top_folder = rel_dir.split(os.sep)[0] - folder_hints[top_folder] = folder_hints.get(top_folder, 0) + len(audio_files) - - # Tag-based hints - for fname in audio_files: - full_path = os.path.join(root, fname) - try: - from mutagen import File as MutagenFile - tags = MutagenFile(full_path, easy=True) - if tags: - album = (tags.get('album') or [None])[0] - artist = (tags.get('artist') or (tags.get('albumartist') or [None]))[0] - if album: - key = (album.strip(), (artist or '').strip()) - tag_albums[key] = tag_albums.get(key, 0) + 1 - except Exception as e: - logger.debug("tag read failed: %s", e) - - # Build search queries, prioritizing tag-based hints (more specific) - queries = [] - seen_queries_lower = set() - - # Tag-based: sort by file count descending - for (album, artist), _count in sorted(tag_albums.items(), key=lambda x: -x[1]): - q = f"{album} {artist}".strip() if artist else album - if q.lower() not in seen_queries_lower: - seen_queries_lower.add(q.lower()) - queries.append(q) - - # Folder-based: parse "Artist - Album" pattern or use as-is - for folder, _count in sorted(folder_hints.items(), key=lambda x: -x[1]): - q = folder.replace('_', ' ') - if q.lower() not in seen_queries_lower: - seen_queries_lower.add(q.lower()) - queries.append(q) - - # Cap at 5 queries to keep it fast - queries = queries[:5] - - return jsonify({'success': True, 'hints': queries}) - except Exception as e: - logger.error(f"Error getting staging hints: {e}") - return jsonify({'success': False, 'error': str(e)}), 500 + payload, status = _import_staging_hints(_build_import_route_runtime()) + return jsonify(payload), status @app.route('/api/import/search/albums', methods=['GET']) @@ -34942,13 +34798,8 @@ def auto_import_clear_completed(): @app.route('/api/import/staging/suggestions', methods=['GET']) def import_staging_suggestions(): - """Return cached import suggestions. If cache isn't built yet, returns partial/empty with a flag.""" - cache = get_import_suggestions_cache() - return jsonify({ - 'success': True, - 'suggestions': cache['suggestions'], - 'ready': cache['built'], - }) + payload, status = _import_staging_suggestions() + return jsonify(payload), status # ================================================================================================