Merge pull request #580 from Nezreka/refactor/import-routes-extraction

Extract import staging route helpers
pull/582/head
BoulderBadgeDad 2 weeks ago committed by GitHub
commit 2a2ffaa192
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,186 @@
"""Import/staging controller helpers for Flask-style endpoints."""
from __future__ import annotations
import os
from dataclasses import dataclass
from typing import Any, Callable, Dict
from core.imports.staging import (
AUDIO_EXTENSIONS,
get_import_suggestions_cache,
get_staging_path as _get_staging_path,
read_staging_file_metadata as _read_staging_file_metadata,
)
from utils.logging_config import get_logger
module_logger = get_logger("imports.routes")
def _default_read_tags(file_path: str):
from mutagen import File as MutagenFile
return MutagenFile(file_path, easy=True)
@dataclass
class ImportRouteRuntime:
"""Dependencies needed to service import/staging HTTP endpoints."""
get_staging_path: Callable[[], str] = _get_staging_path
read_staging_file_metadata: Callable[[str, str], Dict[str, Any]] = _read_staging_file_metadata
read_tags: Callable[[str], Any] = _default_read_tags
logger: Any = module_logger
def staging_files(runtime: ImportRouteRuntime) -> tuple[Dict[str, Any], int]:
"""Scan the staging folder and return audio files with tag metadata."""
try:
staging_path = runtime.get_staging_path()
os.makedirs(staging_path, exist_ok=True)
files = []
for root, _dirs, filenames in os.walk(staging_path):
for fname in filenames:
ext = os.path.splitext(fname)[1].lower()
if ext not in AUDIO_EXTENSIONS:
continue
full_path = os.path.join(root, fname)
rel_path = os.path.relpath(full_path, staging_path)
meta = runtime.read_staging_file_metadata(full_path, rel_path)
files.append(
{
"filename": fname,
"rel_path": rel_path,
"full_path": full_path,
"title": meta["title"],
"artist": meta["albumartist"] or meta["artist"] or "Unknown Artist",
"album": meta["album"],
"track_number": meta["track_number"],
"disc_number": meta["disc_number"],
"extension": ext,
}
)
files.sort(key=lambda f: f["filename"].lower())
return {"success": True, "files": files, "staging_path": staging_path}, 200
except Exception as exc:
runtime.logger.error("Error scanning staging files: %s", exc)
return {"success": False, "error": str(exc)}, 500
def staging_groups(runtime: ImportRouteRuntime) -> tuple[Dict[str, Any], int]:
"""Auto-detect album groups from staging files based on their tags."""
try:
staging_path = runtime.get_staging_path()
if not os.path.isdir(staging_path):
return {"success": True, "groups": []}, 200
album_groups = {}
for root, _dirs, filenames in os.walk(staging_path):
for fname in filenames:
ext = os.path.splitext(fname)[1].lower()
if ext not in AUDIO_EXTENSIONS:
continue
full_path = os.path.join(root, fname)
rel_path = os.path.relpath(full_path, staging_path)
meta = runtime.read_staging_file_metadata(full_path, rel_path)
album = meta["album"]
artist = meta["albumartist"] or meta["artist"]
if not album or not artist:
continue
key = (album.lower().strip(), artist.lower().strip())
if key not in album_groups:
album_groups[key] = {"album": album.strip(), "artist": artist.strip(), "files": []}
album_groups[key]["files"].append(
{
"filename": fname,
"full_path": full_path,
"title": meta["title"],
"track_number": meta["track_number"],
}
)
groups = []
for group in album_groups.values():
if len(group["files"]) >= 2:
group["files"].sort(key=lambda f: f.get("track_number") or 999)
groups.append(
{
"album": group["album"],
"artist": group["artist"],
"file_count": len(group["files"]),
"files": group["files"],
"file_paths": [f["full_path"] for f in group["files"]],
}
)
groups.sort(key=lambda g: g["file_count"], reverse=True)
return {"success": True, "groups": groups}, 200
except Exception as exc:
runtime.logger.error("Error building staging groups: %s", exc)
return {"success": False, "error": str(exc)}, 500
def staging_hints(runtime: ImportRouteRuntime) -> tuple[Dict[str, Any], int]:
"""Extract album search hints from staging folder tags and folder names."""
try:
staging_path = runtime.get_staging_path()
if not os.path.isdir(staging_path):
return {"success": True, "hints": []}, 200
tag_albums = {}
folder_hints = {}
for root, _dirs, filenames in os.walk(staging_path):
audio_files = [f for f in filenames if os.path.splitext(f)[1].lower() in AUDIO_EXTENSIONS]
if not audio_files:
continue
rel_dir = os.path.relpath(root, staging_path)
if rel_dir != ".":
top_folder = rel_dir.split(os.sep)[0]
folder_hints[top_folder] = folder_hints.get(top_folder, 0) + len(audio_files)
for fname in audio_files:
full_path = os.path.join(root, fname)
try:
tags = runtime.read_tags(full_path)
if tags:
album = (tags.get("album") or [None])[0]
artist = (tags.get("artist") or (tags.get("albumartist") or [None]))[0]
if album:
key = (album.strip(), (artist or "").strip())
tag_albums[key] = tag_albums.get(key, 0) + 1
except Exception as exc:
runtime.logger.debug("tag read failed: %s", exc)
queries = []
seen_queries_lower = set()
for (album, artist), _count in sorted(tag_albums.items(), key=lambda x: -x[1]):
query = f"{album} {artist}".strip() if artist else album
if query.lower() not in seen_queries_lower:
seen_queries_lower.add(query.lower())
queries.append(query)
for folder, _count in sorted(folder_hints.items(), key=lambda x: -x[1]):
query = folder.replace("_", " ")
if query.lower() not in seen_queries_lower:
seen_queries_lower.add(query.lower())
queries.append(query)
return {"success": True, "hints": queries[:5]}, 200
except Exception as exc:
runtime.logger.error("Error getting staging hints: %s", exc)
return {"success": False, "error": str(exc)}, 500
def staging_suggestions() -> tuple[Dict[str, Any], int]:
"""Return cached import suggestions and readiness state."""
cache = get_import_suggestions_cache()
return {"success": True, "suggestions": cache["suggestions"], "ready": cache["built"]}, 200

@ -0,0 +1,230 @@
import os
import core.imports.routes as import_routes
from core.imports.routes import ImportRouteRuntime, staging_files, staging_groups, staging_hints, staging_suggestions
class _FakeLogger:
def __init__(self):
self.debug_messages = []
self.error_messages = []
def debug(self, msg, *args):
self.debug_messages.append(msg % args if args else msg)
def error(self, msg, *args):
self.error_messages.append(msg % args if args else msg)
def _touch(path):
path.parent.mkdir(parents=True, exist_ok=True)
path.write_bytes(b"")
def _metadata_for(files):
def _read_metadata(file_path, rel_path):
return files[rel_path]
return _read_metadata
def test_staging_files_returns_audio_files_with_metadata(tmp_path):
_touch(tmp_path / "Artist" / "02 - Song.flac")
_touch(tmp_path / "cover.jpg")
rel_song = os.path.join("Artist", "02 - Song.flac")
metadata = {
rel_song: {
"title": "Song",
"artist": "Track Artist",
"albumartist": "Album Artist",
"album": "Album",
"track_number": 2,
"disc_number": 1,
}
}
runtime = ImportRouteRuntime(
get_staging_path=lambda: str(tmp_path),
read_staging_file_metadata=_metadata_for(metadata),
logger=_FakeLogger(),
)
payload, status = staging_files(runtime)
assert status == 200
assert payload["success"] is True
assert payload["staging_path"] == str(tmp_path)
assert len(payload["files"]) == 1
assert payload["files"] == [
{
"filename": "02 - Song.flac",
"rel_path": rel_song,
"full_path": str(tmp_path / "Artist" / "02 - Song.flac"),
"title": "Song",
"artist": "Album Artist",
"album": "Album",
"track_number": 2,
"disc_number": 1,
"extension": ".flac",
}
]
def test_staging_groups_only_returns_multi_file_album_groups(tmp_path):
_touch(tmp_path / "a.mp3")
_touch(tmp_path / "b.mp3")
_touch(tmp_path / "single.mp3")
metadata = {
"a.mp3": {
"title": "A",
"artist": "Artist",
"albumartist": "",
"album": "Album",
"track_number": 2,
"disc_number": 1,
},
"b.mp3": {
"title": "B",
"artist": "Artist",
"albumartist": "",
"album": "Album",
"track_number": 1,
"disc_number": 1,
},
"single.mp3": {
"title": "Single",
"artist": "Other",
"albumartist": "",
"album": "Other Album",
"track_number": 1,
"disc_number": 1,
},
}
runtime = ImportRouteRuntime(
get_staging_path=lambda: str(tmp_path),
read_staging_file_metadata=_metadata_for(metadata),
logger=_FakeLogger(),
)
payload, status = staging_groups(runtime)
assert status == 200
assert payload["success"] is True
assert len(payload["groups"]) == 1
group = payload["groups"][0]
assert group["album"] == "Album"
assert group["artist"] == "Artist"
assert group["file_count"] == 2
assert [f["filename"] for f in group["files"]] == ["b.mp3", "a.mp3"]
def test_staging_hints_prefers_tag_queries_then_folder_queries(tmp_path):
_touch(tmp_path / "Folder_Album" / "01.mp3")
_touch(tmp_path / "Folder_Album" / "02.mp3")
_touch(tmp_path / "Loose" / "track.flac")
def _read_tags(file_path):
if file_path.endswith("01.mp3") or file_path.endswith("02.mp3"):
return {"album": ["Tagged Album"], "artist": ["Tagged Artist"]}
return {}
runtime = ImportRouteRuntime(
get_staging_path=lambda: str(tmp_path),
read_tags=_read_tags,
logger=_FakeLogger(),
)
payload, status = staging_hints(runtime)
assert status == 200
assert payload == {
"success": True,
"hints": ["Tagged Album Tagged Artist", "Folder Album", "Loose"],
}
def test_staging_suggestions_returns_cache_payload(monkeypatch):
monkeypatch.setattr(
import_routes,
"get_import_suggestions_cache",
lambda: {"suggestions": [{"album": "Album"}], "built": True},
)
payload, status = staging_suggestions()
assert status == 200
assert payload == {
"success": True,
"suggestions": [{"album": "Album"}],
"ready": True,
}
def test_staging_groups_returns_empty_for_missing_staging_path(tmp_path):
runtime = ImportRouteRuntime(
get_staging_path=lambda: str(tmp_path / "missing"),
logger=_FakeLogger(),
)
payload, status = staging_groups(runtime)
assert status == 200
assert payload == {"success": True, "groups": []}
def test_staging_hints_returns_empty_for_missing_staging_path(tmp_path):
runtime = ImportRouteRuntime(
get_staging_path=lambda: str(tmp_path / "missing"),
logger=_FakeLogger(),
)
payload, status = staging_hints(runtime)
assert status == 200
assert payload == {"success": True, "hints": []}
def test_staging_files_returns_error_when_path_resolution_fails():
logger = _FakeLogger()
runtime = ImportRouteRuntime(
get_staging_path=lambda: (_ for _ in ()).throw(RuntimeError("path boom")),
logger=logger,
)
payload, status = staging_files(runtime)
assert status == 500
assert payload["success"] is False
assert payload["error"] == "path boom"
assert logger.error_messages == ["Error scanning staging files: path boom"]
def test_staging_groups_returns_error_when_metadata_read_fails(tmp_path):
_touch(tmp_path / "a.mp3")
logger = _FakeLogger()
runtime = ImportRouteRuntime(
get_staging_path=lambda: str(tmp_path),
read_staging_file_metadata=lambda _file_path, _rel_path: (_ for _ in ()).throw(RuntimeError("tag boom")),
logger=logger,
)
payload, status = staging_groups(runtime)
assert status == 500
assert payload["success"] is False
assert payload["error"] == "tag boom"
assert logger.error_messages == ["Error building staging groups: tag boom"]
def test_staging_hints_returns_error_when_path_resolution_fails():
logger = _FakeLogger()
runtime = ImportRouteRuntime(
get_staging_path=lambda: (_ for _ in ()).throw(RuntimeError("hint boom")),
logger=logger,
)
payload, status = staging_hints(runtime)
assert status == 500
assert payload["success"] is False
assert payload["error"] == "hint boom"
assert logger.error_messages == ["Error getting staging hints: hint boom"]

@ -169,7 +169,6 @@ from core.imports.album import (
from core.imports.album_naming import resolve_album_group as _resolve_album_group
from core.imports.filename import extract_track_number_from_filename, parse_filename_metadata
from core.imports.staging import (
get_import_suggestions_cache,
get_primary_source,
get_staging_path,
read_staging_file_metadata,
@ -178,6 +177,11 @@ from core.imports.staging import (
search_import_tracks,
start_import_suggestions_cache,
)
from core.imports.routes import ImportRouteRuntime as _ImportRouteRuntime
from core.imports.routes import staging_files as _import_staging_files
from core.imports.routes import staging_groups as _import_staging_groups
from core.imports.routes import staging_hints as _import_staging_hints
from core.imports.routes import staging_suggestions as _import_staging_suggestions
from core.imports.paths import build_final_path_for_track as _build_final_path_for_track
from core.imports.pipeline import build_import_pipeline_runtime as _build_import_pipeline_runtime
from core.metadata.common import get_file_lock
@ -34264,174 +34268,26 @@ def repair_job_progress():
# IMPORT / STAGING SYSTEM
# ================================================================================================
AUDIO_EXTENSIONS = {'.mp3', '.flac', '.ogg', '.opus', '.m4a', '.aac', '.wav', '.wma', '.aiff', '.aif', '.ape'}
def _build_import_route_runtime():
return _ImportRouteRuntime(logger=logger)
@app.route('/api/import/staging/files', methods=['GET'])
def import_staging_files():
"""Scan the staging folder and return audio files with tag metadata."""
try:
staging_path = get_staging_path()
os.makedirs(staging_path, exist_ok=True)
files = []
for root, _dirs, filenames in os.walk(staging_path):
for fname in filenames:
ext = os.path.splitext(fname)[1].lower()
if ext not in AUDIO_EXTENSIONS:
continue
full_path = os.path.join(root, fname)
rel_path = os.path.relpath(full_path, staging_path)
meta = read_staging_file_metadata(full_path, rel_path)
files.append({
'filename': fname,
'rel_path': rel_path,
'full_path': full_path,
'title': meta['title'],
'artist': meta['albumartist'] or meta['artist'] or 'Unknown Artist',
'album': meta['album'],
'track_number': meta['track_number'],
'disc_number': meta['disc_number'],
'extension': ext
})
# Sort by filename
files.sort(key=lambda f: f['filename'].lower())
return jsonify({'success': True, 'files': files, 'staging_path': staging_path})
except Exception as e:
logger.error(f"Error scanning staging files: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
payload, status = _import_staging_files(_build_import_route_runtime())
return jsonify(payload), status
@app.route('/api/import/staging/groups', methods=['GET'])
def import_staging_groups():
"""Auto-detect album groups from staging files based on their tags.
Groups files by (album_tag, artist) where both are non-empty and at least 2 files share
the same album+artist combo. Returns groups sorted by file count descending.
"""
try:
staging_path = get_staging_path()
if not os.path.isdir(staging_path):
return jsonify({'success': True, 'groups': []})
# Scan files and group by album+artist tags
album_groups = {} # (album_lower, artist_lower) -> {album, artist, files: []}
for root, _dirs, filenames in os.walk(staging_path):
for fname in filenames:
ext = os.path.splitext(fname)[1].lower()
if ext not in AUDIO_EXTENSIONS:
continue
full_path = os.path.join(root, fname)
rel_path = os.path.relpath(full_path, staging_path)
meta = read_staging_file_metadata(full_path, rel_path)
album = meta['album']
artist = meta['albumartist'] or meta['artist']
if not album or not artist:
continue
key = (album.lower().strip(), artist.lower().strip())
if key not in album_groups:
album_groups[key] = {
'album': album.strip(),
'artist': artist.strip(),
'files': []
}
album_groups[key]['files'].append({
'filename': fname,
'full_path': full_path,
'title': meta['title'],
'track_number': meta['track_number'],
})
# Only return groups with 2+ files
groups = []
for group in album_groups.values():
if len(group['files']) >= 2:
group['files'].sort(key=lambda f: f.get('track_number') or 999)
groups.append({
'album': group['album'],
'artist': group['artist'],
'file_count': len(group['files']),
'files': group['files'],
'file_paths': [f['full_path'] for f in group['files']],
})
# Sort by file count descending
groups.sort(key=lambda g: g['file_count'], reverse=True)
return jsonify({'success': True, 'groups': groups})
except Exception as e:
logger.error(f"Error building staging groups: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
payload, status = _import_staging_groups(_build_import_route_runtime())
return jsonify(payload), status
@app.route('/api/import/staging/hints', methods=['GET'])
def import_staging_hints():
"""Extract album search hints from staging folder (tags + folder names). Fast — no Spotify calls."""
try:
staging_path = get_staging_path()
if not os.path.isdir(staging_path):
return jsonify({'success': True, 'hints': []})
# Collect hints from tags and folder structure
tag_albums = {} # (album, artist) -> file count
folder_hints = {} # subfolder name -> file count
for root, _dirs, filenames in os.walk(staging_path):
audio_files = [f for f in filenames if os.path.splitext(f)[1].lower() in AUDIO_EXTENSIONS]
if not audio_files:
continue
# Folder-based hint: use immediate subfolder name relative to staging
rel_dir = os.path.relpath(root, staging_path)
if rel_dir != '.':
# Use the top-level subfolder as the hint
top_folder = rel_dir.split(os.sep)[0]
folder_hints[top_folder] = folder_hints.get(top_folder, 0) + len(audio_files)
# Tag-based hints
for fname in audio_files:
full_path = os.path.join(root, fname)
try:
from mutagen import File as MutagenFile
tags = MutagenFile(full_path, easy=True)
if tags:
album = (tags.get('album') or [None])[0]
artist = (tags.get('artist') or (tags.get('albumartist') or [None]))[0]
if album:
key = (album.strip(), (artist or '').strip())
tag_albums[key] = tag_albums.get(key, 0) + 1
except Exception as e:
logger.debug("tag read failed: %s", e)
# Build search queries, prioritizing tag-based hints (more specific)
queries = []
seen_queries_lower = set()
# Tag-based: sort by file count descending
for (album, artist), _count in sorted(tag_albums.items(), key=lambda x: -x[1]):
q = f"{album} {artist}".strip() if artist else album
if q.lower() not in seen_queries_lower:
seen_queries_lower.add(q.lower())
queries.append(q)
# Folder-based: parse "Artist - Album" pattern or use as-is
for folder, _count in sorted(folder_hints.items(), key=lambda x: -x[1]):
q = folder.replace('_', ' ')
if q.lower() not in seen_queries_lower:
seen_queries_lower.add(q.lower())
queries.append(q)
# Cap at 5 queries to keep it fast
queries = queries[:5]
return jsonify({'success': True, 'hints': queries})
except Exception as e:
logger.error(f"Error getting staging hints: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
payload, status = _import_staging_hints(_build_import_route_runtime())
return jsonify(payload), status
@app.route('/api/import/search/albums', methods=['GET'])
@ -34942,13 +34798,8 @@ def auto_import_clear_completed():
@app.route('/api/import/staging/suggestions', methods=['GET'])
def import_staging_suggestions():
"""Return cached import suggestions. If cache isn't built yet, returns partial/empty with a flag."""
cache = get_import_suggestions_cache()
return jsonify({
'success': True,
'suggestions': cache['suggestions'],
'ready': cache['built'],
})
payload, status = _import_staging_suggestions()
return jsonify(payload), status
# ================================================================================================

Loading…
Cancel
Save