You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
SoulSync/core/discogs_worker.py

476 lines
18 KiB

"""
Discogs background enrichment worker.
Enriches library artists and albums with Discogs metadata:
- Artists: discogs_id, bio, members, genres, styles, URLs, images
- Albums: discogs_id, genres, styles, label, catalog number, country, community rating
Follows the exact same pattern as AudioDBWorker.
"""
import json
import re
import threading
import time
from difflib import SequenceMatcher
from typing import Optional, Dict, Any
from datetime import datetime, timedelta
from utils.logging_config import get_logger
from database.music_database import MusicDatabase
from core.discogs_client import DiscogsClient
logger = get_logger("discogs_worker")
class DiscogsWorker:
"""Background worker for enriching library artists and albums with Discogs metadata."""
def __init__(self, database: MusicDatabase):
self.db = database
self.client = DiscogsClient()
# Worker state
self.running = False
self.paused = False
self.should_stop = False
self.thread = None
# Current item being processed (for UI tooltip)
self.current_item = None
# Statistics
self.stats = {
'matched': 0,
'not_found': 0,
'pending': 0,
'errors': 0
}
# Retry configuration
self.retry_days = 30
# Name matching threshold
self.name_similarity_threshold = 0.80
logger.info(f"Discogs background worker initialized (authenticated: {self.client.is_authenticated()})")
def start(self):
"""Start the background worker."""
if self.running:
logger.warning("Discogs worker already running")
return
self.running = True
self.should_stop = False
self.thread = threading.Thread(target=self._run, daemon=True)
self.thread.start()
logger.info("Discogs background worker started")
def stop(self):
"""Stop the background worker."""
if not self.running:
return
logger.info("Stopping Discogs worker...")
self.should_stop = True
self.running = False
if self.thread:
self.thread.join(timeout=5)
logger.info("Discogs worker stopped")
def pause(self):
"""Pause the worker."""
if not self.running:
return
self.paused = True
logger.info("Discogs worker paused")
def resume(self):
"""Resume the worker."""
if not self.running:
return
self.paused = False
logger.info("Discogs worker resumed")
def get_stats(self) -> Dict[str, Any]:
"""Get current statistics."""
self.stats['pending'] = self._count_pending_items()
is_actually_running = self.running and (self.thread is not None and self.thread.is_alive())
is_idle = is_actually_running and not self.paused and self.stats['pending'] == 0 and self.current_item is None
return {
'enabled': True,
'running': is_actually_running and not self.paused,
'paused': self.paused,
'idle': is_idle,
'current_item': self.current_item,
'stats': self.stats.copy(),
}
def _run(self):
"""Main worker loop."""
logger.info("Discogs worker thread started")
while not self.should_stop:
try:
if self.paused:
time.sleep(1)
continue
self.current_item = None
item = self._get_next_item()
if not item:
time.sleep(10)
continue
self.current_item = item.get('name', '')
# Guard: skip items with None/NULL IDs
item_id = item.get('id')
if item_id is None:
logger.warning(f"Skipping {item.get('type', 'unknown')} with NULL id")
continue
self._process_item(item)
time.sleep(2)
except Exception as e:
logger.error(f"Error in Discogs worker loop: {e}")
time.sleep(5)
logger.info("Discogs worker thread finished")
def _get_next_item(self) -> Optional[Dict[str, Any]]:
"""Get next item to process (artists → albums → retries)."""
conn = None
try:
conn = self.db._get_connection()
cursor = conn.cursor()
# Priority 1: Unattempted artists
cursor.execute("""
SELECT id, name FROM artists
WHERE discogs_match_status IS NULL AND id IS NOT NULL
ORDER BY id ASC LIMIT 1
""")
row = cursor.fetchone()
if row:
return {'type': 'artist', 'id': row[0], 'name': row[1]}
# Priority 2: Unattempted albums
cursor.execute("""
SELECT a.id, a.title, ar.name AS artist_name, ar.discogs_id AS artist_discogs_id
FROM albums a
JOIN artists ar ON a.artist_id = ar.id
WHERE a.discogs_match_status IS NULL AND a.id IS NOT NULL
ORDER BY a.id ASC LIMIT 1
""")
row = cursor.fetchone()
if row:
return {'type': 'album', 'id': row[0], 'name': row[1], 'artist': row[2], 'artist_discogs_id': row[3]}
# Priority 3: Retry 'not_found' artists after retry_days
not_found_cutoff = datetime.now() - timedelta(days=self.retry_days)
cursor.execute("""
SELECT id, name FROM artists
WHERE discogs_match_status = 'not_found' AND discogs_last_attempted < ?
ORDER BY discogs_last_attempted ASC LIMIT 1
""", (not_found_cutoff,))
row = cursor.fetchone()
if row:
return {'type': 'artist', 'id': row[0], 'name': row[1]}
# Priority 4: Retry 'not_found' albums
cursor.execute("""
SELECT a.id, a.title, ar.name AS artist_name, ar.discogs_id AS artist_discogs_id
FROM albums a
JOIN artists ar ON a.artist_id = ar.id
WHERE a.discogs_match_status = 'not_found' AND a.discogs_last_attempted < ?
ORDER BY a.discogs_last_attempted ASC LIMIT 1
""", (not_found_cutoff,))
row = cursor.fetchone()
if row:
return {'type': 'album', 'id': row[0], 'name': row[1], 'artist': row[2], 'artist_discogs_id': row[3]}
return None
except Exception as e:
logger.error(f"Error getting next Discogs item: {e}")
return None
finally:
if conn:
conn.close()
def _count_pending_items(self) -> int:
"""Count items still needing Discogs enrichment."""
try:
conn = self.db._get_connection()
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM artists WHERE discogs_match_status IS NULL AND id IS NOT NULL")
artists = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM albums WHERE discogs_match_status IS NULL AND id IS NOT NULL")
albums = cursor.fetchone()[0]
conn.close()
return artists + albums
except Exception:
return 0
def _normalize_name(self, name: str) -> str:
"""Normalize name for comparison."""
name = name.lower().strip()
name = re.sub(r'\s+[-–—]\s+.*$', '', name)
name = re.sub(r'\s*\(.*?\)\s*', ' ', name)
name = re.sub(r'[^\w\s]', '', name)
name = re.sub(r'\s+', ' ', name).strip()
return name
def _name_matches(self, query_name: str, result_name: str) -> bool:
"""Check if Discogs result name matches our query with fuzzy matching."""
norm_query = self._normalize_name(query_name)
norm_result = self._normalize_name(result_name)
similarity = SequenceMatcher(None, norm_query, norm_result).ratio()
return similarity >= self.name_similarity_threshold
def _process_item(self, item: Dict[str, Any]):
"""Process a single artist or album."""
try:
item_type = item['type']
item_id = item['id']
item_name = item['name']
logger.debug(f"Processing {item_type} #{item_id}: {item_name}")
# Check for existing discogs_id (manual match) — use direct lookup
existing_id = self._get_existing_id(item_type, item_id)
if existing_id:
try:
if item_type == 'artist':
data = self.client._fetch_and_cache_artist(existing_id)
if data:
self._update_artist(item_id, data)
self.stats['matched'] += 1
logger.info(f"Enriched artist '{item_name}' from existing Discogs ID: {existing_id}")
return
elif item_type == 'album':
data = self.client._fetch_and_cache_album(existing_id)
if data:
self._update_album(item_id, data)
self.stats['matched'] += 1
logger.info(f"Enriched album '{item_name}' from existing Discogs ID: {existing_id}")
return
except Exception as e:
logger.warning(f"Direct Discogs lookup failed for ID {existing_id}: {e}")
return # Preserve manual match, don't search
if item_type == 'artist':
self._search_and_match_artist(item_id, item_name)
elif item_type == 'album':
self._search_and_match_album(item_id, item_name, item.get('artist', ''), item.get('artist_discogs_id'))
except Exception as e:
logger.error(f"Error processing {item.get('type')} #{item.get('id')}: {e}")
self.stats['errors'] += 1
try:
self._mark_status(item['type'], item['id'], 'error')
except Exception:
pass
def _get_existing_id(self, entity_type: str, entity_id) -> Optional[str]:
"""Check if entity already has a discogs_id."""
table = 'artists' if entity_type == 'artist' else 'albums'
try:
conn = self.db._get_connection()
cursor = conn.cursor()
cursor.execute(f"SELECT discogs_id FROM {table} WHERE id = ?", (entity_id,))
row = cursor.fetchone()
conn.close()
return row[0] if row and row[0] else None
except Exception:
return None
def _search_and_match_artist(self, artist_id, artist_name: str):
"""Search Discogs for an artist and store metadata if matched."""
results = self.client.search_artists(artist_name, limit=5)
if not results:
self._mark_status('artist', artist_id, 'not_found')
self.stats['not_found'] += 1
return
# Find best match by name similarity
for result in results:
if self._name_matches(artist_name, result.name):
# Fetch full artist detail (uses cache)
data = self.client._fetch_and_cache_artist(result.id)
if data:
self._update_artist(artist_id, data)
self.stats['matched'] += 1
logger.info(f"Matched artist '{artist_name}' -> Discogs ID: {result.id}")
return
self._mark_status('artist', artist_id, 'not_found')
self.stats['not_found'] += 1
logger.debug(f"No confident match for artist '{artist_name}'")
def _search_and_match_album(self, album_id, album_name: str, artist_name: str, artist_discogs_id: str = None):
"""Search Discogs for an album and store metadata if matched."""
# Search with artist + album for better precision
query = f"{artist_name} {album_name}" if artist_name else album_name
results = self.client.search_albums(query, limit=5)
if not results:
self._mark_status('album', album_id, 'not_found')
self.stats['not_found'] += 1
return
for result in results:
if self._name_matches(album_name, result.name):
# Fetch full release detail (uses cache)
data = self.client._fetch_and_cache_album(result.id)
if data:
self._update_album(album_id, data)
self.stats['matched'] += 1
logger.info(f"Matched album '{album_name}' -> Discogs ID: {result.id}")
return
self._mark_status('album', album_id, 'not_found')
self.stats['not_found'] += 1
logger.debug(f"No confident match for album '{album_name}'")
def _update_artist(self, artist_id, data: Dict[str, Any]):
"""Store Discogs metadata for an artist."""
conn = None
try:
conn = self.db._get_connection()
cursor = conn.cursor()
discogs_id = str(data.get('id', ''))
bio = data.get('profile', '')
members = json.dumps([m.get('name', '') for m in data.get('members', [])]) if data.get('members') else None
urls = json.dumps(data.get('urls', [])) if data.get('urls') else None
# Get image
image_url = None
images = data.get('images', [])
if images:
primary = next((img for img in images if img.get('type') == 'primary'), None)
image_url = (primary or images[0]).get('uri')
cursor.execute("""
UPDATE artists SET
discogs_id = ?,
discogs_match_status = 'matched',
discogs_last_attempted = CURRENT_TIMESTAMP,
discogs_bio = ?,
discogs_members = ?,
discogs_urls = ?,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (discogs_id, bio, members, urls, artist_id))
# Backfill summary/bio if empty (AudioDB backfill)
if bio:
cursor.execute("""
UPDATE artists SET summary = ?
WHERE id = ? AND (summary IS NULL OR summary = '')
""", (bio, artist_id))
# Backfill thumb_url if empty
if image_url:
cursor.execute("""
UPDATE artists SET thumb_url = ?
WHERE id = ? AND (thumb_url IS NULL OR thumb_url = '')
""", (image_url, artist_id))
conn.commit()
except Exception as e:
logger.error(f"Error updating artist #{artist_id} with Discogs data: {e}")
raise
finally:
if conn:
conn.close()
def _update_album(self, album_id, data: Dict[str, Any]):
"""Store Discogs metadata for an album."""
conn = None
try:
conn = self.db._get_connection()
cursor = conn.cursor()
discogs_id = str(data.get('id', ''))
genres = json.dumps(data.get('genres', []))
styles = json.dumps(data.get('styles', []))
labels = data.get('labels', [])
label = labels[0].get('name', '') if labels else ''
catno = labels[0].get('catno', '') if labels else ''
country = data.get('country', '')
# Community rating
community = data.get('community', {})
rating = community.get('rating', {})
rating_avg = rating.get('average', 0)
rating_count = rating.get('count', 0)
# Image
image_url = None
images = data.get('images', [])
if images:
primary = next((img for img in images if img.get('type') == 'primary'), None)
image_url = (primary or images[0]).get('uri')
cursor.execute("""
UPDATE albums SET
discogs_id = ?,
discogs_match_status = 'matched',
discogs_last_attempted = CURRENT_TIMESTAMP,
discogs_genres = ?,
discogs_styles = ?,
discogs_label = ?,
discogs_catno = ?,
discogs_country = ?,
discogs_rating = ?,
discogs_rating_count = ?,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (discogs_id, genres, styles, label, catno, country, rating_avg, rating_count, album_id))
# Backfill genres if empty
if data.get('genres'):
cursor.execute("""
UPDATE albums SET genres = ?
WHERE id = ? AND (genres IS NULL OR genres = '' OR genres = '[]')
""", (genres, album_id))
# Backfill thumb_url if empty
if image_url:
cursor.execute("""
UPDATE albums SET thumb_url = ?
WHERE id = ? AND (thumb_url IS NULL OR thumb_url = '')
""", (image_url, album_id))
conn.commit()
except Exception as e:
logger.error(f"Error updating album #{album_id} with Discogs data: {e}")
raise
finally:
if conn:
conn.close()
def _mark_status(self, entity_type: str, entity_id, status: str):
"""Mark entity's Discogs match status."""
table = {'artist': 'artists', 'album': 'albums'}.get(entity_type)
if not table:
return
try:
conn = self.db._get_connection()
cursor = conn.cursor()
cursor.execute(f"""
UPDATE {table} SET
discogs_match_status = ?,
discogs_last_attempted = CURRENT_TIMESTAMP
WHERE id = ?
""", (status, entity_id))
conn.commit()
conn.close()
except Exception as e:
logger.error(f"Error marking {entity_type} #{entity_id} status: {e}")