Add Auto-Fill fix handler for incomplete album findings

When the maintenance worker flags an incomplete album, users can now
click "Auto-Fill" to automatically locate missing tracks in the library,
move/copy them into the album folder, and apply full metadata enhancement
(MusicBrainz, Deezer, cover art, etc.). Singles are moved; tracks from
multi-track albums are copied. Quality gate prevents filling FLAC albums
with lossy files. Tracks not found in library are added to wishlist with
album context for auto-download.
pull/253/head
Broque Thomas 2 months ago
parent fbf44123ec
commit cde5754f0f

@ -125,10 +125,20 @@ class AlbumCompletenessJob(RepairJob):
for item in api_tracks['items']:
tn = item.get('track_number')
if tn and tn not in owned_numbers:
# Extract artist names from Spotify track data
track_artists = []
for a in item.get('artists', []):
if isinstance(a, dict):
track_artists.append(a.get('name', ''))
elif isinstance(a, str):
track_artists.append(a)
missing_tracks.append({
'track_number': tn,
'name': item.get('name', ''),
'disc_number': item.get('disc_number', 1),
'spotify_track_id': item.get('id', ''),
'duration_ms': item.get('duration_ms', 0),
'artists': track_artists,
})
except Exception as e:
logger.debug("Error getting album tracks for %s: %s", spotify_album_id, e)

@ -9,8 +9,11 @@ The worker is deactivated by default — the user must explicitly enable it.
import json
import os
import re
import shutil
import threading
import time
from difflib import SequenceMatcher
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Tuple
@ -105,6 +108,9 @@ class RepairWorker:
self._acoustid_client = None
self._metadata_cache = None
# Metadata enhancement callback (injected from web_server.py)
self._enhance_file_metadata = None
logger.info("Repair worker initialized (transfer_folder=%s)", self.transfer_folder)
# ------------------------------------------------------------------
@ -129,6 +135,14 @@ class RepairWorker:
if config_manager:
self.enabled = config_manager.get('repair.master_enabled', True)
def set_metadata_enhancer(self, enhance_fn):
"""Inject the metadata enhancement function from web_server.py.
This is _enhance_file_metadata(file_path, context, artist, album_info)
which handles full tag writing, source ID embedding, cover art, etc.
"""
self._enhance_file_metadata = enhance_fn
# ------------------------------------------------------------------
# Lazy client accessors
# ------------------------------------------------------------------
@ -781,6 +795,7 @@ class RepairWorker:
'duplicate_tracks': self._fix_duplicates,
'single_album_redundant': self._fix_single_album_redundant,
'mbid_mismatch': self._fix_mbid_mismatch,
'incomplete_album': self._fix_incomplete_album,
}
handler = handlers.get(finding_type)
if not handler:
@ -1089,6 +1104,501 @@ class RepairWorker:
except Exception as e:
return {'success': False, 'error': f'Failed to remove MBID: {str(e)}'}
# --- Album Completeness Auto-Fill ---
@staticmethod
def _quality_score(file_path, bitrate):
"""Return numeric quality score from file extension + bitrate.
Lossless formats (FLAC/WAV/ALAC/AIFF) 9999.
Lossy bitrate value (e.g. 320 for MP3-320).
"""
ext = os.path.splitext(file_path or '')[1].lstrip('.').upper() if file_path else ''
if ext in ('FLAC', 'WAV', 'ALAC', 'AIFF', 'AIF'):
return 9999
br = bitrate or 0
try:
return int(str(br).replace('k', '').replace('K', '').strip())
except (ValueError, TypeError):
return 0
@staticmethod
def _detect_filename_pattern(file_paths):
"""Detect naming convention from existing track filenames.
Returns a format string like '{num:02d} - {title}' or '{num} {title}'.
"""
patterns_found = {'dash': 0, 'dot': 0, 'space': 0, 'none': 0}
zero_padded = 0
total = 0
for fp in file_paths:
if not fp:
continue
basename = os.path.splitext(os.path.basename(fp))[0]
total += 1
# Check for leading number patterns
m = re.match(r'^(\d+)\s*[-–—]\s*(.+)', basename)
if m:
patterns_found['dash'] += 1
if m.group(1).startswith('0'):
zero_padded += 1
continue
m = re.match(r'^(\d+)\.\s*(.+)', basename)
if m:
patterns_found['dot'] += 1
if m.group(1).startswith('0'):
zero_padded += 1
continue
m = re.match(r'^(\d+)\s+(.+)', basename)
if m:
patterns_found['space'] += 1
if m.group(1).startswith('0'):
zero_padded += 1
continue
patterns_found['none'] += 1
pad = zero_padded > total / 2 if total else True
num_fmt = '{num:02d}' if pad else '{num}'
best = max(patterns_found, key=patterns_found.get)
if best == 'dash':
return num_fmt + ' - {title}'
elif best == 'dot':
return num_fmt + '. {title}'
elif best == 'space':
return num_fmt + ' {title}'
# Default
return '{num:02d} - {title}'
def _fix_incomplete_album(self, entity_type, entity_id, file_path, details):
"""Auto-fill an incomplete album by finding missing tracks in the library.
For each missing track:
1. Search library for matching tracks
2. Quality gate candidate must meet album's minimum quality
3. Single source (1-track album) MOVE file; multi-track COPY
4. Retag the file with correct album metadata
5. If no candidate found or quality too low add to wishlist
"""
album_id = details.get('album_id')
missing_tracks = details.get('missing_tracks', [])
album_title = details.get('album_title', 'Unknown Album')
artist_name = details.get('artist', 'Unknown Artist')
spotify_album_id = details.get('spotify_album_id', '')
if not album_id or not missing_tracks:
return {'success': False, 'error': 'Missing album_id or missing_tracks in finding details'}
# Phase 1: Gather context from existing album tracks
existing_tracks = self.db.get_tracks_by_album(int(album_id))
if not existing_tracks:
return {'success': False, 'error': 'No existing tracks found for this album — cannot determine album folder or quality'}
# Compute quality floor from existing tracks
quality_scores = [self._quality_score(t.file_path, t.bitrate) for t in existing_tracks]
album_quality_floor = min(quality_scores) if quality_scores else 0
# Infer album folder from existing track file paths
download_folder = None
if self._config_manager:
download_folder = self._config_manager.get('soulseek.download_path', '')
album_folder = None
for t in existing_tracks:
resolved = _resolve_file_path(t.file_path, self.transfer_folder, download_folder)
if resolved and os.path.exists(resolved):
album_folder = os.path.dirname(resolved)
break
if not album_folder:
return {'success': False, 'error': 'Could not determine album folder from existing tracks'}
# Detect filename pattern
resolved_paths = []
for t in existing_tracks:
rp = _resolve_file_path(t.file_path, self.transfer_folder, download_folder)
if rp:
resolved_paths.append(rp)
filename_pattern = self._detect_filename_pattern(resolved_paths)
# Phase 2-4: Process each missing track
fixed_count = 0
wishlisted_count = 0
skipped_count = 0
track_details = []
existing_track_ids = {t.id for t in existing_tracks}
for mt in missing_tracks:
track_name = mt.get('name', '')
track_number = mt.get('track_number', 0)
disc_number = mt.get('disc_number', 1)
track_artists = mt.get('artists', [])
spotify_track_id = mt.get('spotify_track_id', '')
artist_search = track_artists[0] if track_artists else artist_name
if not track_name:
skipped_count += 1
track_details.append({'track': track_name, 'status': 'skipped', 'reason': 'no track name'})
continue
# Search library for this track
candidates = self.db.search_tracks(title=track_name, artist=artist_search, limit=20)
# Filter: exclude tracks already in target album, require title similarity
best_candidate = None
best_score = -1
for cand in candidates:
if cand.id in existing_track_ids:
continue
if cand.album_id == int(album_id):
continue
# Fuzzy title match
title_sim = SequenceMatcher(None, track_name.lower(), cand.title.lower()).ratio()
if title_sim < 0.70:
continue
# Artist match (more lenient)
cand_artist = getattr(cand, 'artist_name', '') or ''
artist_sim = SequenceMatcher(None, artist_search.lower(), cand_artist.lower()).ratio()
if artist_sim < 0.50:
continue
# Quality gate
cand_quality = self._quality_score(cand.file_path, cand.bitrate)
if cand_quality < album_quality_floor:
continue
# Score: prefer higher quality, then better title match
score = cand_quality * 1000 + title_sim * 100
if score > best_score:
best_score = score
best_candidate = cand
if best_candidate:
# Phase 3: File operation
result = self._perform_album_fill(
best_candidate, album_id, album_title, artist_name,
track_name, track_number, disc_number,
album_folder, filename_pattern, download_folder
)
if result.get('success'):
fixed_count += 1
track_details.append({
'track': track_name,
'status': 'fixed',
'action': result.get('action', ''),
'message': result.get('message', '')
})
# Add the candidate ID to existing so we don't reuse it
existing_track_ids.add(best_candidate.id)
continue
else:
# File operation failed — fall through to wishlist
logger.warning("File operation failed for '%s': %s", track_name, result.get('error'))
# Phase 4: Wishlist fallback
if spotify_track_id:
try:
wishlist_data = {
'id': spotify_track_id,
'name': track_name,
'artists': [{'name': a} for a in track_artists] if track_artists else [{'name': artist_name}],
'album': {'name': album_title},
'duration_ms': mt.get('duration_ms', 0),
}
source_info = {
'album_title': album_title,
'artist': artist_name,
'track_number': track_number,
'spotify_album_id': spotify_album_id,
'reason': 'album_completeness_auto_fill',
}
self.db.add_to_wishlist(
wishlist_data,
failure_reason='Missing from incomplete album',
source_type='album',
source_info=source_info,
)
wishlisted_count += 1
track_details.append({
'track': track_name,
'status': 'wishlisted',
'reason': 'no suitable candidate in library' if not best_candidate else 'quality too low'
})
except Exception as e:
logger.debug("Failed to add '%s' to wishlist: %s", track_name, e)
skipped_count += 1
track_details.append({'track': track_name, 'status': 'skipped', 'reason': f'wishlist error: {e}'})
else:
skipped_count += 1
track_details.append({'track': track_name, 'status': 'skipped', 'reason': 'no spotify_track_id for wishlist'})
# Build result message
parts = []
if fixed_count:
parts.append(f'{fixed_count} track(s) filled')
if wishlisted_count:
parts.append(f'{wishlisted_count} added to wishlist')
if skipped_count:
parts.append(f'{skipped_count} skipped')
message = f'Album "{album_title}": ' + ', '.join(parts) if parts else 'No tracks processed'
success = fixed_count > 0 or wishlisted_count > 0
return {
'success': success,
'action': 'auto_fill_album',
'message': message,
'fixed': fixed_count,
'wishlisted': wishlisted_count,
'skipped': skipped_count,
'details': track_details,
}
def _perform_album_fill(self, candidate, album_id, album_title, artist_name,
track_name, track_number, disc_number,
album_folder, filename_pattern, download_folder):
"""Move or copy a candidate track into the album folder and update DB."""
try:
# Resolve source file
src_path = _resolve_file_path(candidate.file_path, self.transfer_folder, download_folder)
if not src_path or not os.path.exists(src_path):
return {'success': False, 'error': f'Source file not found: {candidate.file_path}'}
# Determine source type: single (1-track album) vs multi-track
source_album_tracks = self.db.get_tracks_by_album(candidate.album_id)
is_single_source = len(source_album_tracks) <= 1
# Build target filename
src_ext = os.path.splitext(src_path)[1] # e.g. '.flac'
# Sanitize title for filesystem
safe_title = re.sub(r'[<>:"/\\|?*]', '', track_name).strip()
target_name = filename_pattern.format(num=track_number, title=safe_title) + src_ext
target_path = os.path.join(album_folder, target_name)
# Avoid overwriting existing files
if os.path.exists(target_path):
return {'success': False, 'error': f'Target file already exists: {target_path}'}
# Ensure album folder exists
os.makedirs(album_folder, exist_ok=True)
conn = None
try:
if is_single_source:
# MOVE: relocate file and update DB record
shutil.move(src_path, target_path)
action = 'moved'
# Update existing DB record to point to new album and path
conn = self.db._get_connection()
cursor = conn.cursor()
# Get the target album's artist_id for consistency
cursor.execute("SELECT artist_id FROM tracks WHERE album_id = ? LIMIT 1", (album_id,))
artist_row = cursor.fetchone()
target_artist_id = artist_row[0] if artist_row else candidate.artist_id
cursor.execute("""
UPDATE tracks
SET album_id = ?, artist_id = ?, title = ?,
file_path = ?, track_number = ?,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (album_id, target_artist_id, track_name,
target_path, track_number, candidate.id))
# Clean up the source single's album if it's now empty
cursor.execute("SELECT COUNT(*) FROM tracks WHERE album_id = ?", (candidate.album_id,))
remaining = cursor.fetchone()[0]
if remaining == 0:
cursor.execute("DELETE FROM albums WHERE id = ?", (candidate.album_id,))
conn.commit()
# Clean up empty source directories
self._cleanup_empty_dirs(os.path.dirname(src_path))
else:
# COPY: duplicate file, create new DB record
shutil.copy2(src_path, target_path)
action = 'copied'
conn = self.db._get_connection()
cursor = conn.cursor()
# Get artist_id from existing album tracks
cursor.execute("SELECT artist_id FROM tracks WHERE album_id = ? LIMIT 1", (album_id,))
artist_row = cursor.fetchone()
target_artist_id = artist_row[0] if artist_row else candidate.artist_id
cursor.execute("""
INSERT INTO tracks (album_id, artist_id, title, track_number, duration,
file_path, bitrate, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
""", (album_id, target_artist_id, track_name, track_number,
candidate.duration, target_path, candidate.bitrate))
conn.commit()
finally:
if conn:
conn.close()
# Enhance the file with full metadata pipeline (same as fresh downloads)
# Clears existing tags, writes standard + source IDs, embeds cover art
self._enhance_placed_track(
target_path, album_id, album_title, artist_name,
track_name, track_number, disc_number
)
return {
'success': True,
'action': action,
'message': f'{action.title()} "{track_name}" from {"single" if is_single_source else "compilation"}'
}
except Exception as e:
logger.error("Error filling track '%s': %s", track_name, e, exc_info=True)
return {'success': False, 'error': str(e)}
def _cleanup_empty_dirs(self, directory):
"""Remove empty parent directories up to 3 levels, never removing transfer folder."""
if not directory:
return
transfer_norm = os.path.normpath(self.transfer_folder)
parent = directory
for _ in range(3):
if (parent and os.path.isdir(parent)
and os.path.normpath(parent) != transfer_norm
and not os.listdir(parent)):
try:
os.rmdir(parent)
except OSError:
break
parent = os.path.dirname(parent)
else:
break
def _enhance_placed_track(self, file_path, album_id, album_title, artist_name,
track_name, track_number, disc_number):
"""Run full metadata enhancement on a placed track.
Uses the injected _enhance_file_metadata from web_server.py (same pipeline
as fresh downloads) clears tags, writes standard metadata, embeds source
IDs from MusicBrainz/Deezer/etc., and embeds cover art.
Falls back to basic tag_writer if the enhancer isn't available.
"""
# Fetch album metadata from DB for building synthetic context
album_year = None
album_genres = []
album_thumb = None
album_track_count = None
spotify_album_id = None
conn_meta = None
try:
conn_meta = self.db._get_connection()
cursor_meta = conn_meta.cursor()
cursor_meta.execute(
"SELECT year, genres, thumb_url, track_count, spotify_album_id FROM albums WHERE id = ?",
(album_id,)
)
album_row = cursor_meta.fetchone()
if album_row:
album_year = album_row[0]
if album_row[1]:
try:
parsed = json.loads(album_row[1])
if isinstance(parsed, list):
album_genres = parsed
except (json.JSONDecodeError, TypeError):
pass
album_thumb = album_row[2]
album_track_count = album_row[3]
spotify_album_id = album_row[4] if len(album_row) > 4 else None
except Exception:
pass
finally:
if conn_meta:
conn_meta.close()
# Try full enhancement pipeline if available AND enabled in config
# _enhance_file_metadata returns True without writing when enhancement is disabled,
# so we must check the config ourselves to avoid skipping the basic fallback
enhancement_enabled = (
self._enhance_file_metadata is not None
and self._config_manager
and self._config_manager.get('metadata_enhancement.enabled', True)
)
if enhancement_enabled:
try:
# Build synthetic context dicts (same pattern as _execute_retag in web_server.py)
context = {
'original_search_result': {
'spotify_clean_title': track_name,
'title': track_name,
'disc_number': disc_number,
'artists': [{'name': artist_name}],
},
'spotify_album': {
'id': spotify_album_id or '',
'name': album_title,
'release_date': str(album_year) if album_year else '',
'total_tracks': album_track_count or 1,
'image_url': album_thumb or '',
},
'track_info': {
'id': '', # No specific track ID available
},
}
artist = {
'name': artist_name,
'id': '',
'genres': album_genres[:2] if album_genres else [],
}
album_info = {
'is_album': True,
'album_name': album_title,
'track_number': track_number,
'total_tracks': album_track_count or 1,
'disc_number': disc_number,
'clean_track_name': track_name,
'album_image_url': album_thumb or '',
}
result = self._enhance_file_metadata(file_path, context, artist, album_info)
if result:
logger.info("Full metadata enhancement applied to '%s'", track_name)
return
else:
logger.warning("Full enhancement returned False for '%s', falling back to basic tags", track_name)
except Exception as e:
logger.warning("Full enhancement failed for '%s': %s — falling back to basic tags", track_name, e)
# Fallback: basic tag writer (title, artist, album, track#, disc#, year, genre, cover art)
# Used when: enhancer not injected, metadata enhancement disabled, or enhancer failed
try:
from core.tag_writer import write_tags_to_file
tag_data = {
'title': track_name,
'artist': artist_name,
'album_artist': artist_name,
'album': album_title,
'track_number': track_number,
'disc_number': disc_number,
}
if album_year:
tag_data['year'] = album_year
if album_genres:
tag_data['genre'] = ', '.join(album_genres[:5])
if album_track_count:
tag_data['total_tracks'] = album_track_count
write_tags_to_file(file_path, tag_data,
embed_cover=bool(album_thumb),
cover_url=album_thumb)
logger.info("Basic tag enhancement applied to '%s'", track_name)
except Exception as e:
logger.warning("Retagging failed for '%s' (file still placed): %s", file_path, e)
def dismiss_finding(self, finding_id: int) -> bool:
"""Dismiss a finding."""
conn = None
@ -1120,7 +1630,8 @@ class RepairWorker:
# Build query for pending fixable findings
fixable_types = ('dead_file', 'orphan_file', 'track_number_mismatch',
'missing_cover_art', 'metadata_gap', 'duplicate_tracks', 'mbid_mismatch')
'missing_cover_art', 'metadata_gap', 'duplicate_tracks', 'mbid_mismatch',
'incomplete_album')
placeholders = ','.join(['?'] * len(fixable_types))
where_parts = [f"finding_type IN ({placeholders})", "status = 'pending'"]
params = list(fixable_types)

@ -39796,6 +39796,7 @@ try:
transfer_path = docker_resolve_path(config_manager.get('soulseek.transfer_path', './Transfer'))
repair_worker = RepairWorker(database=repair_db, transfer_folder=transfer_path)
repair_worker.set_config_manager(config_manager)
repair_worker.set_metadata_enhancer(_enhance_file_metadata)
# --- Repair Job Progress Tracking (live progress like automation cards) ---
repair_job_progress_states = {} # job_id (str) -> state dict

@ -52659,6 +52659,7 @@ async function loadRepairFindings() {
missing_cover_art: 'Apply Art',
metadata_gap: 'Apply',
duplicate_tracks: 'Keep Best',
incomplete_album: 'Auto-Fill',
};
container.innerHTML = items.map(f => {

Loading…
Cancel
Save