mirror of https://github.com/Nezreka/SoulSync.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
476 lines
18 KiB
476 lines
18 KiB
"""
|
|
Discogs background enrichment worker.
|
|
|
|
Enriches library artists and albums with Discogs metadata:
|
|
- Artists: discogs_id, bio, members, genres, styles, URLs, images
|
|
- Albums: discogs_id, genres, styles, label, catalog number, country, community rating
|
|
|
|
Follows the exact same pattern as AudioDBWorker.
|
|
"""
|
|
|
|
import json
|
|
import re
|
|
import threading
|
|
import time
|
|
from difflib import SequenceMatcher
|
|
from typing import Optional, Dict, Any
|
|
from datetime import datetime, timedelta
|
|
from utils.logging_config import get_logger
|
|
from database.music_database import MusicDatabase
|
|
from core.discogs_client import DiscogsClient
|
|
|
|
logger = get_logger("discogs_worker")
|
|
|
|
|
|
class DiscogsWorker:
|
|
"""Background worker for enriching library artists and albums with Discogs metadata."""
|
|
|
|
def __init__(self, database: MusicDatabase):
|
|
self.db = database
|
|
self.client = DiscogsClient()
|
|
|
|
# Worker state
|
|
self.running = False
|
|
self.paused = False
|
|
self.should_stop = False
|
|
self.thread = None
|
|
|
|
# Current item being processed (for UI tooltip)
|
|
self.current_item = None
|
|
|
|
# Statistics
|
|
self.stats = {
|
|
'matched': 0,
|
|
'not_found': 0,
|
|
'pending': 0,
|
|
'errors': 0
|
|
}
|
|
|
|
# Retry configuration
|
|
self.retry_days = 30
|
|
|
|
# Name matching threshold
|
|
self.name_similarity_threshold = 0.80
|
|
|
|
logger.info(f"Discogs background worker initialized (authenticated: {self.client.is_authenticated()})")
|
|
|
|
def start(self):
|
|
"""Start the background worker."""
|
|
if self.running:
|
|
logger.warning("Discogs worker already running")
|
|
return
|
|
|
|
self.running = True
|
|
self.should_stop = False
|
|
self.thread = threading.Thread(target=self._run, daemon=True)
|
|
self.thread.start()
|
|
logger.info("Discogs background worker started")
|
|
|
|
def stop(self):
|
|
"""Stop the background worker."""
|
|
if not self.running:
|
|
return
|
|
logger.info("Stopping Discogs worker...")
|
|
self.should_stop = True
|
|
self.running = False
|
|
if self.thread:
|
|
self.thread.join(timeout=5)
|
|
logger.info("Discogs worker stopped")
|
|
|
|
def pause(self):
|
|
"""Pause the worker."""
|
|
if not self.running:
|
|
return
|
|
self.paused = True
|
|
logger.info("Discogs worker paused")
|
|
|
|
def resume(self):
|
|
"""Resume the worker."""
|
|
if not self.running:
|
|
return
|
|
self.paused = False
|
|
logger.info("Discogs worker resumed")
|
|
|
|
def get_stats(self) -> Dict[str, Any]:
|
|
"""Get current statistics."""
|
|
self.stats['pending'] = self._count_pending_items()
|
|
is_actually_running = self.running and (self.thread is not None and self.thread.is_alive())
|
|
is_idle = is_actually_running and not self.paused and self.stats['pending'] == 0 and self.current_item is None
|
|
|
|
return {
|
|
'enabled': True,
|
|
'running': is_actually_running and not self.paused,
|
|
'paused': self.paused,
|
|
'idle': is_idle,
|
|
'current_item': self.current_item,
|
|
'stats': self.stats.copy(),
|
|
}
|
|
|
|
def _run(self):
|
|
"""Main worker loop."""
|
|
logger.info("Discogs worker thread started")
|
|
|
|
while not self.should_stop:
|
|
try:
|
|
if self.paused:
|
|
time.sleep(1)
|
|
continue
|
|
|
|
self.current_item = None
|
|
item = self._get_next_item()
|
|
|
|
if not item:
|
|
time.sleep(10)
|
|
continue
|
|
|
|
self.current_item = item.get('name', '')
|
|
|
|
# Guard: skip items with None/NULL IDs
|
|
item_id = item.get('id')
|
|
if item_id is None:
|
|
logger.warning(f"Skipping {item.get('type', 'unknown')} with NULL id")
|
|
continue
|
|
|
|
self._process_item(item)
|
|
time.sleep(2)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in Discogs worker loop: {e}")
|
|
time.sleep(5)
|
|
|
|
logger.info("Discogs worker thread finished")
|
|
|
|
def _get_next_item(self) -> Optional[Dict[str, Any]]:
|
|
"""Get next item to process (artists → albums → retries)."""
|
|
conn = None
|
|
try:
|
|
conn = self.db._get_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Priority 1: Unattempted artists
|
|
cursor.execute("""
|
|
SELECT id, name FROM artists
|
|
WHERE discogs_match_status IS NULL AND id IS NOT NULL
|
|
ORDER BY id ASC LIMIT 1
|
|
""")
|
|
row = cursor.fetchone()
|
|
if row:
|
|
return {'type': 'artist', 'id': row[0], 'name': row[1]}
|
|
|
|
# Priority 2: Unattempted albums
|
|
cursor.execute("""
|
|
SELECT a.id, a.title, ar.name AS artist_name, ar.discogs_id AS artist_discogs_id
|
|
FROM albums a
|
|
JOIN artists ar ON a.artist_id = ar.id
|
|
WHERE a.discogs_match_status IS NULL AND a.id IS NOT NULL
|
|
ORDER BY a.id ASC LIMIT 1
|
|
""")
|
|
row = cursor.fetchone()
|
|
if row:
|
|
return {'type': 'album', 'id': row[0], 'name': row[1], 'artist': row[2], 'artist_discogs_id': row[3]}
|
|
|
|
# Priority 3: Retry 'not_found' artists after retry_days
|
|
not_found_cutoff = datetime.now() - timedelta(days=self.retry_days)
|
|
cursor.execute("""
|
|
SELECT id, name FROM artists
|
|
WHERE discogs_match_status = 'not_found' AND discogs_last_attempted < ?
|
|
ORDER BY discogs_last_attempted ASC LIMIT 1
|
|
""", (not_found_cutoff,))
|
|
row = cursor.fetchone()
|
|
if row:
|
|
return {'type': 'artist', 'id': row[0], 'name': row[1]}
|
|
|
|
# Priority 4: Retry 'not_found' albums
|
|
cursor.execute("""
|
|
SELECT a.id, a.title, ar.name AS artist_name, ar.discogs_id AS artist_discogs_id
|
|
FROM albums a
|
|
JOIN artists ar ON a.artist_id = ar.id
|
|
WHERE a.discogs_match_status = 'not_found' AND a.discogs_last_attempted < ?
|
|
ORDER BY a.discogs_last_attempted ASC LIMIT 1
|
|
""", (not_found_cutoff,))
|
|
row = cursor.fetchone()
|
|
if row:
|
|
return {'type': 'album', 'id': row[0], 'name': row[1], 'artist': row[2], 'artist_discogs_id': row[3]}
|
|
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error getting next Discogs item: {e}")
|
|
return None
|
|
finally:
|
|
if conn:
|
|
conn.close()
|
|
|
|
def _count_pending_items(self) -> int:
|
|
"""Count items still needing Discogs enrichment."""
|
|
try:
|
|
conn = self.db._get_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT COUNT(*) FROM artists WHERE discogs_match_status IS NULL AND id IS NOT NULL")
|
|
artists = cursor.fetchone()[0]
|
|
cursor.execute("SELECT COUNT(*) FROM albums WHERE discogs_match_status IS NULL AND id IS NOT NULL")
|
|
albums = cursor.fetchone()[0]
|
|
conn.close()
|
|
return artists + albums
|
|
except Exception:
|
|
return 0
|
|
|
|
def _normalize_name(self, name: str) -> str:
|
|
"""Normalize name for comparison."""
|
|
name = name.lower().strip()
|
|
name = re.sub(r'\s+[-–—]\s+.*$', '', name)
|
|
name = re.sub(r'\s*\(.*?\)\s*', ' ', name)
|
|
name = re.sub(r'[^\w\s]', '', name)
|
|
name = re.sub(r'\s+', ' ', name).strip()
|
|
return name
|
|
|
|
def _name_matches(self, query_name: str, result_name: str) -> bool:
|
|
"""Check if Discogs result name matches our query with fuzzy matching."""
|
|
norm_query = self._normalize_name(query_name)
|
|
norm_result = self._normalize_name(result_name)
|
|
similarity = SequenceMatcher(None, norm_query, norm_result).ratio()
|
|
return similarity >= self.name_similarity_threshold
|
|
|
|
def _process_item(self, item: Dict[str, Any]):
|
|
"""Process a single artist or album."""
|
|
try:
|
|
item_type = item['type']
|
|
item_id = item['id']
|
|
item_name = item['name']
|
|
|
|
logger.debug(f"Processing {item_type} #{item_id}: {item_name}")
|
|
|
|
# Check for existing discogs_id (manual match) — use direct lookup
|
|
existing_id = self._get_existing_id(item_type, item_id)
|
|
if existing_id:
|
|
try:
|
|
if item_type == 'artist':
|
|
data = self.client._fetch_and_cache_artist(existing_id)
|
|
if data:
|
|
self._update_artist(item_id, data)
|
|
self.stats['matched'] += 1
|
|
logger.info(f"Enriched artist '{item_name}' from existing Discogs ID: {existing_id}")
|
|
return
|
|
elif item_type == 'album':
|
|
data = self.client._fetch_and_cache_album(existing_id)
|
|
if data:
|
|
self._update_album(item_id, data)
|
|
self.stats['matched'] += 1
|
|
logger.info(f"Enriched album '{item_name}' from existing Discogs ID: {existing_id}")
|
|
return
|
|
except Exception as e:
|
|
logger.warning(f"Direct Discogs lookup failed for ID {existing_id}: {e}")
|
|
return # Preserve manual match, don't search
|
|
|
|
if item_type == 'artist':
|
|
self._search_and_match_artist(item_id, item_name)
|
|
elif item_type == 'album':
|
|
self._search_and_match_album(item_id, item_name, item.get('artist', ''), item.get('artist_discogs_id'))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing {item.get('type')} #{item.get('id')}: {e}")
|
|
self.stats['errors'] += 1
|
|
try:
|
|
self._mark_status(item['type'], item['id'], 'error')
|
|
except Exception:
|
|
pass
|
|
|
|
def _get_existing_id(self, entity_type: str, entity_id) -> Optional[str]:
|
|
"""Check if entity already has a discogs_id."""
|
|
table = 'artists' if entity_type == 'artist' else 'albums'
|
|
try:
|
|
conn = self.db._get_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute(f"SELECT discogs_id FROM {table} WHERE id = ?", (entity_id,))
|
|
row = cursor.fetchone()
|
|
conn.close()
|
|
return row[0] if row and row[0] else None
|
|
except Exception:
|
|
return None
|
|
|
|
def _search_and_match_artist(self, artist_id, artist_name: str):
|
|
"""Search Discogs for an artist and store metadata if matched."""
|
|
results = self.client.search_artists(artist_name, limit=5)
|
|
if not results:
|
|
self._mark_status('artist', artist_id, 'not_found')
|
|
self.stats['not_found'] += 1
|
|
return
|
|
|
|
# Find best match by name similarity
|
|
for result in results:
|
|
if self._name_matches(artist_name, result.name):
|
|
# Fetch full artist detail (uses cache)
|
|
data = self.client._fetch_and_cache_artist(result.id)
|
|
if data:
|
|
self._update_artist(artist_id, data)
|
|
self.stats['matched'] += 1
|
|
logger.info(f"Matched artist '{artist_name}' -> Discogs ID: {result.id}")
|
|
return
|
|
|
|
self._mark_status('artist', artist_id, 'not_found')
|
|
self.stats['not_found'] += 1
|
|
logger.debug(f"No confident match for artist '{artist_name}'")
|
|
|
|
def _search_and_match_album(self, album_id, album_name: str, artist_name: str, artist_discogs_id: str = None):
|
|
"""Search Discogs for an album and store metadata if matched."""
|
|
# Search with artist + album for better precision
|
|
query = f"{artist_name} {album_name}" if artist_name else album_name
|
|
results = self.client.search_albums(query, limit=5)
|
|
if not results:
|
|
self._mark_status('album', album_id, 'not_found')
|
|
self.stats['not_found'] += 1
|
|
return
|
|
|
|
for result in results:
|
|
if self._name_matches(album_name, result.name):
|
|
# Fetch full release detail (uses cache)
|
|
data = self.client._fetch_and_cache_album(result.id)
|
|
if data:
|
|
self._update_album(album_id, data)
|
|
self.stats['matched'] += 1
|
|
logger.info(f"Matched album '{album_name}' -> Discogs ID: {result.id}")
|
|
return
|
|
|
|
self._mark_status('album', album_id, 'not_found')
|
|
self.stats['not_found'] += 1
|
|
logger.debug(f"No confident match for album '{album_name}'")
|
|
|
|
def _update_artist(self, artist_id, data: Dict[str, Any]):
|
|
"""Store Discogs metadata for an artist."""
|
|
conn = None
|
|
try:
|
|
conn = self.db._get_connection()
|
|
cursor = conn.cursor()
|
|
|
|
discogs_id = str(data.get('id', ''))
|
|
bio = data.get('profile', '')
|
|
members = json.dumps([m.get('name', '') for m in data.get('members', [])]) if data.get('members') else None
|
|
urls = json.dumps(data.get('urls', [])) if data.get('urls') else None
|
|
|
|
# Get image
|
|
image_url = None
|
|
images = data.get('images', [])
|
|
if images:
|
|
primary = next((img for img in images if img.get('type') == 'primary'), None)
|
|
image_url = (primary or images[0]).get('uri')
|
|
|
|
cursor.execute("""
|
|
UPDATE artists SET
|
|
discogs_id = ?,
|
|
discogs_match_status = 'matched',
|
|
discogs_last_attempted = CURRENT_TIMESTAMP,
|
|
discogs_bio = ?,
|
|
discogs_members = ?,
|
|
discogs_urls = ?,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
""", (discogs_id, bio, members, urls, artist_id))
|
|
|
|
# Backfill summary/bio if empty (AudioDB backfill)
|
|
if bio:
|
|
cursor.execute("""
|
|
UPDATE artists SET summary = ?
|
|
WHERE id = ? AND (summary IS NULL OR summary = '')
|
|
""", (bio, artist_id))
|
|
|
|
# Backfill thumb_url if empty
|
|
if image_url:
|
|
cursor.execute("""
|
|
UPDATE artists SET thumb_url = ?
|
|
WHERE id = ? AND (thumb_url IS NULL OR thumb_url = '')
|
|
""", (image_url, artist_id))
|
|
|
|
conn.commit()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating artist #{artist_id} with Discogs data: {e}")
|
|
raise
|
|
finally:
|
|
if conn:
|
|
conn.close()
|
|
|
|
def _update_album(self, album_id, data: Dict[str, Any]):
|
|
"""Store Discogs metadata for an album."""
|
|
conn = None
|
|
try:
|
|
conn = self.db._get_connection()
|
|
cursor = conn.cursor()
|
|
|
|
discogs_id = str(data.get('id', ''))
|
|
genres = json.dumps(data.get('genres', []))
|
|
styles = json.dumps(data.get('styles', []))
|
|
labels = data.get('labels', [])
|
|
label = labels[0].get('name', '') if labels else ''
|
|
catno = labels[0].get('catno', '') if labels else ''
|
|
country = data.get('country', '')
|
|
|
|
# Community rating
|
|
community = data.get('community', {})
|
|
rating = community.get('rating', {})
|
|
rating_avg = rating.get('average', 0)
|
|
rating_count = rating.get('count', 0)
|
|
|
|
# Image
|
|
image_url = None
|
|
images = data.get('images', [])
|
|
if images:
|
|
primary = next((img for img in images if img.get('type') == 'primary'), None)
|
|
image_url = (primary or images[0]).get('uri')
|
|
|
|
cursor.execute("""
|
|
UPDATE albums SET
|
|
discogs_id = ?,
|
|
discogs_match_status = 'matched',
|
|
discogs_last_attempted = CURRENT_TIMESTAMP,
|
|
discogs_genres = ?,
|
|
discogs_styles = ?,
|
|
discogs_label = ?,
|
|
discogs_catno = ?,
|
|
discogs_country = ?,
|
|
discogs_rating = ?,
|
|
discogs_rating_count = ?,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
""", (discogs_id, genres, styles, label, catno, country, rating_avg, rating_count, album_id))
|
|
|
|
# Backfill genres if empty
|
|
if data.get('genres'):
|
|
cursor.execute("""
|
|
UPDATE albums SET genres = ?
|
|
WHERE id = ? AND (genres IS NULL OR genres = '' OR genres = '[]')
|
|
""", (genres, album_id))
|
|
|
|
# Backfill thumb_url if empty
|
|
if image_url:
|
|
cursor.execute("""
|
|
UPDATE albums SET thumb_url = ?
|
|
WHERE id = ? AND (thumb_url IS NULL OR thumb_url = '')
|
|
""", (image_url, album_id))
|
|
|
|
conn.commit()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating album #{album_id} with Discogs data: {e}")
|
|
raise
|
|
finally:
|
|
if conn:
|
|
conn.close()
|
|
|
|
def _mark_status(self, entity_type: str, entity_id, status: str):
|
|
"""Mark entity's Discogs match status."""
|
|
table = {'artist': 'artists', 'album': 'albums'}.get(entity_type)
|
|
if not table:
|
|
return
|
|
try:
|
|
conn = self.db._get_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute(f"""
|
|
UPDATE {table} SET
|
|
discogs_match_status = ?,
|
|
discogs_last_attempted = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
""", (status, entity_id))
|
|
conn.commit()
|
|
conn.close()
|
|
except Exception as e:
|
|
logger.error(f"Error marking {entity_type} #{entity_id} status: {e}")
|