You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
SoulSync/core/worker_utils.py

342 lines
14 KiB

"""Shared helpers for background workers."""
import logging
import re
import threading
from difflib import SequenceMatcher
from typing import Optional
logger = logging.getLogger(__name__)
# Artist-match acceptance gate. Stricter than the 0.80 each worker uses for
# album/track titles: artist names are short, so 0.80 lets distinct artists
# slip through ("ODESZA"/"odessa", "Blance"/"Blanke", "Lady A"/"Lady Gaga" all
# score 0.80-0.83). 0.85 rejects those while still tolerating real variation
# that survives normalization.
ARTIST_NAME_MATCH_THRESHOLD = 0.85
# Whitelist of artist source-id columns we'll interpolate into SQL — guards the
# conflict query against any unexpected column name.
_ARTIST_ID_COLUMNS = frozenset({
'deezer_id', 'spotify_artist_id', 'itunes_artist_id', 'musicbrainz_id',
'discogs_id', 'audiodb_id', 'qobuz_id', 'tidal_id', 'amazon_id', 'soul_id',
})
def normalize_artist_name(name: str) -> str:
"""Lowercase, drop ' - ...' suffixes / parentheticals / punctuation, and
collapse whitespace — the same normalization the per-worker matchers use."""
name = (name or '').lower().strip()
name = re.sub(r'\s+[-–—]\s+.*$', '', name)
name = re.sub(r'\s*\(.*?\)\s*', ' ', name)
name = re.sub(r'[^\w\s]', '', name)
name = re.sub(r'\s+', ' ', name).strip()
return name
def artist_name_matches(query: str, result: str,
threshold: float = ARTIST_NAME_MATCH_THRESHOLD) -> bool:
"""True if two artist names match at/above ``threshold`` after normalization."""
nq, nr = normalize_artist_name(query), normalize_artist_name(result)
if not nq or not nr:
return False
return SequenceMatcher(None, nq, nr).ratio() >= threshold
def _names_equivalent(a: str, b: str) -> bool:
return normalize_artist_name(a) == normalize_artist_name(b)
def source_id_conflict(database, id_column: str, source_id, artist_id,
artist_name: str) -> Optional[str]:
"""Return the name of a DIFFERENTLY-named library artist that already holds
``source_id`` in ``id_column``, or None.
A same-named holder (the same artist indexed on two media servers) is NOT a
conflict — both legitimately share the id. Only a different artist holding
the id signals the kind of corruption where one source id gets smeared
across unrelated artists.
"""
if source_id in (None, ''):
return None
if id_column not in _ARTIST_ID_COLUMNS:
logger.debug(f"source_id_conflict: refusing unknown column {id_column!r}")
return None
try:
with database._get_connection() as conn:
rows = conn.execute(
f"SELECT name FROM artists WHERE {id_column} = ? AND id != ?",
(str(source_id), artist_id),
).fetchall()
except Exception as e:
logger.debug(f"source_id_conflict check failed for {id_column}={source_id}: {e}")
return None
for (other_name,) in rows:
if not _names_equivalent(artist_name, other_name):
return other_name
return None
def accept_artist_match(database, id_column: str, source_id, artist_id,
query_name: str, result_name: str,
threshold: float = ARTIST_NAME_MATCH_THRESHOLD) -> tuple:
"""Decide whether to store ``source_id`` on an artist.
Returns ``(ok: bool, reason: str)``. Accepts only when the result's name
matches the library artist at/above ``threshold`` AND the id isn't already
claimed by a differently-named artist. ``reason`` explains a rejection (for
debug logging). This is the single gate every worker's artist match should
pass through, so the 'one id smeared across many artists' bug can't recur.
"""
if not artist_name_matches(query_name, result_name, threshold):
return False, (
f"name mismatch '{query_name}' vs '{result_name}' (< {threshold})"
)
conflict = source_id_conflict(database, id_column, source_id, artist_id, query_name)
if conflict:
return False, (
f"{id_column}={source_id} already claimed by '{conflict}'"
f"skipping to avoid a shared/duplicate id"
)
return True, ""
# --- Same-name artist disambiguation by owned-catalog overlap -------------
# The name gate (above) can't separate two artists who share a name ("Rone" has
# ~5). The decisive signal is the library itself: the user owns albums by the
# RIGHT one. So when several candidates clear the name gate, fetch each one's
# catalog and pick the one whose releases overlap the albums actually owned.
def normalize_release_title(title: str) -> str:
"""Collapse an album/release title for tolerant comparison — drop edition
suffixes ('(Deluxe)', ' - Remastered'), punctuation, and case."""
t = (title or '').lower().strip()
t = re.sub(r'\s*\(.*?\)\s*', ' ', t)
t = re.sub(r'\s*\[.*?\]\s*', ' ', t)
t = re.sub(r'\s+[-–—]\s+.*$', '', t)
t = re.sub(r'[^\w\s]', '', t)
t = re.sub(r'\s+', ' ', t).strip()
return t
def catalog_overlap_score(owned_titles, candidate_titles, threshold: float = 0.85) -> int:
"""How many OWNED album titles appear in the candidate's catalog (fuzzy,
edition-insensitive). The disambiguation signal — higher = better match."""
owned = {normalize_release_title(t) for t in (owned_titles or []) if t}
owned.discard('')
cand = {normalize_release_title(t) for t in (candidate_titles or []) if t}
cand.discard('')
if not owned or not cand:
return 0
score = 0
for o in owned:
if o in cand or any(SequenceMatcher(None, o, c).ratio() >= threshold for c in cand):
score += 1
return score
def pick_artist_by_catalog(candidates, owned_titles, fetch_titles) -> tuple:
"""Choose, among same-name candidates, the one whose catalog best overlaps the
OWNED albums. Returns ``(chosen, overlap_score)``.
``candidates`` — artist objects already past the name gate (order = the
worker's existing best-by-name order; candidates[0] is the
current behavior's pick).
``owned_titles`` — the library artist's owned album titles.
``fetch_titles(candidate) -> list[str]`` — that candidate's album titles;
called ONLY when disambiguation is actually needed (2+
candidates and we have owned albums), so the common
single-candidate path costs no extra API calls.
Falls back to candidates[0] (unchanged behavior) when there's nothing to
disambiguate or no candidate overlaps the owned catalog.
"""
candidates = list(candidates or [])
if not candidates:
return None, 0
if len(candidates) == 1:
return candidates[0], 0
owned = [t for t in (owned_titles or []) if t]
if not owned:
return candidates[0], 0
best, best_score = None, 0
for cand in candidates:
try:
titles = fetch_titles(cand) or []
except Exception as exc:
logger.debug("catalog disambiguation: fetch_titles failed: %s", exc)
titles = []
score = catalog_overlap_score(owned, titles)
if score > best_score:
best, best_score = cand, score
if best is not None and best_score > 0:
return best, best_score
return candidates[0], 0 # no overlap signal → keep the best-by-name pick
def owned_album_titles(database, artist_id) -> list:
"""The album titles the library actually has for this artist — the ground
truth used to disambiguate same-name source artists."""
try:
with database._get_connection() as conn:
rows = conn.execute(
"SELECT title FROM albums WHERE artist_id = ?", (artist_id,)
).fetchall()
return [r[0] for r in rows if r and r[0]]
except Exception as exc:
logger.debug("owned_album_titles(%s) failed: %s", artist_id, exc)
return []
def release_titles(albums) -> list:
"""Extract titles from a list of album objects/dicts (handles ``.title``/
``.name`` / dict keys) — the candidate side of catalog disambiguation."""
out = []
for al in albums or []:
if isinstance(al, dict):
t = al.get('title') or al.get('name')
else:
t = getattr(al, 'title', None) or getattr(al, 'name', None)
if t:
out.append(t)
return out
def interruptible_sleep(stop_event: threading.Event, seconds: float, step: float = 0.5) -> bool:
"""Sleep in chunks so shutdown can interrupt long waits."""
if seconds <= 0:
return stop_event.is_set()
remaining = float(seconds)
while remaining > 0 and not stop_event.is_set():
wait_for = min(step, remaining)
if stop_event.wait(wait_for):
break
remaining -= wait_for
return stop_event.is_set()
def set_album_api_track_count(cursor, album_id, count):
"""Cache an album's authoritative track count from a metadata source.
Called by enrichment workers (Spotify / iTunes / Deezer / Discogs) after
they fetch album metadata. The count is the EXPECTED total tracks
according to that source — distinct from `albums.track_count`, which
server syncs (Plex `leafCount`, SoulSync standalone `len(tracks)`)
populate with the OBSERVED count SoulSync already has indexed. The
Album Completeness repair job reads `albums.api_track_count` as the
expected total; populating it here during enrichment avoids a second
round of API calls during the repair scan.
Skips the write when the source didn't supply a positive numeric count
(None, 0, negative, or non-numeric) — that way a source lacking track
info doesn't overwrite a good value another source already wrote. If
multiple sources report different counts (rare, usually deluxe vs.
standard edition), last-write-wins across enrichment cycles; that's
fine since any metadata-source count is strictly better than the
observed-count fallback that the repair job used before this column
existed.
Caller owns the cursor (and its connection / transaction) — this
helper does not commit. Integrates with each worker's existing
`_update_album` method, which already batches several UPDATEs into
one transaction.
"""
try:
count = int(count or 0)
except (TypeError, ValueError):
return
if count <= 0:
return
# Swallow SQL errors — each worker batches several album UPDATEs into
# one transaction, and we don't want a failure here (e.g., the
# migration somehow hasn't run yet and the column is missing) to
# rollback the worker's other writes (spotify_album_id, thumb_url,
# etc.). The repair job's fallback path will eventually populate the
# column via its own save path once the column exists.
try:
cursor.execute(
"UPDATE albums SET api_track_count = ? WHERE id = ?",
(count, album_id),
)
except Exception as e:
if "api_track_count" in str(e) and "no such column" in str(e).lower():
try:
cursor.execute("ALTER TABLE albums ADD COLUMN api_track_count INTEGER DEFAULT NULL")
cursor.execute(
"UPDATE albums SET api_track_count = ? WHERE id = ?",
(count, album_id),
)
logger.info("Repaired missing api_track_count column while caching album track count")
return
except Exception as repair_error:
e = repair_error
logger.warning(
"Failed to cache api_track_count for album %s: %s", album_id, e
)
# --- Enrichment "process this group first" override -----------------------
# Each enrichment worker normally processes artist -> album -> track. A user
# can pin one entity type to run first via the Manage Enrichment Workers modal;
# the choice is stored in config as "<service>_enrichment_priority" and read
# at the top of each worker's _get_next_item so it takes effect live. When the
# pinned group is exhausted (or unset), the worker falls back to its normal
# chain — so the default path is unchanged.
PRIORITY_ENTITIES = ('artist', 'album', 'track')
def read_enrichment_priority(service: str) -> str:
"""Return the pinned entity ('artist'|'album'|'track') for a worker, or ''.
Read every loop so the override applies without restarting the worker.
Any error / unset / invalid value yields '' (no override)."""
try:
from config.settings import config_manager
val = (config_manager.get(f'{service}_enrichment_priority', '') or '')
val = str(val).strip().lower()
return val if val in PRIORITY_ENTITIES else ''
except Exception:
return ''
def priority_pending_item(cursor, service, entity, type_overrides=None):
"""Return one pending (NULL match_status) item of `entity`, or None.
`service` is the column prefix (e.g. 'spotify' -> spotify_match_status) and
MUST be a trusted worker-supplied literal (it is interpolated into SQL).
`type_overrides` maps the canonical entity to the worker's dispatch 'type'
string — Spotify/iTunes process individual items as 'album_individual' /
'track_individual', the other workers use 'album' / 'track'. The returned
dict matches the shape those workers already return from _get_next_item."""
if not str(service).isalpha() or entity not in PRIORITY_ENTITIES:
return None
type_overrides = type_overrides or {}
ms = f"{service}_match_status"
if entity == 'artist':
cursor.execute(
f"SELECT id, name FROM artists WHERE {ms} IS NULL AND id IS NOT NULL "
f"ORDER BY id ASC LIMIT 1"
)
r = cursor.fetchone()
return {'type': type_overrides.get('artist', 'artist'), 'id': r[0], 'name': r[1]} if r else None
if entity == 'album':
cursor.execute(
f"SELECT a.id, a.title, ar.name FROM albums a JOIN artists ar ON a.artist_id = ar.id "
f"WHERE a.{ms} IS NULL AND a.id IS NOT NULL ORDER BY a.id ASC LIMIT 1"
)
r = cursor.fetchone()
return {'type': type_overrides.get('album', 'album'), 'id': r[0], 'name': r[1], 'artist': r[2]} if r else None
# track
cursor.execute(
f"SELECT t.id, t.title, ar.name FROM tracks t JOIN artists ar ON t.artist_id = ar.id "
f"WHERE t.{ms} IS NULL AND t.id IS NOT NULL ORDER BY t.id ASC LIMIT 1"
)
r = cursor.fetchone()
return {'type': type_overrides.get('track', 'track'), 'id': r[0], 'name': r[1], 'artist': r[2]} if r else None