You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
SoulSync/core/amazon_worker.py

603 lines
24 KiB

import re
import threading
import time
from difflib import SequenceMatcher
from typing import Optional, Dict, Any
from datetime import datetime, timedelta
from utils.logging_config import get_logger
from database.music_database import MusicDatabase
from core.amazon_client import AmazonClient
from core.worker_utils import interruptible_sleep, set_album_api_track_count
from core.enrichment.manual_match_honoring import honor_stored_match
logger = get_logger("amazon_worker")
class AmazonWorker:
"""Background worker for enriching library artists, albums, and tracks with Amazon Music metadata."""
def __init__(self, database: MusicDatabase):
self.db = database
self.client = AmazonClient()
self._amazon_schema_checked = False
self.running = False
self.paused = False
self.should_stop = False
self.thread = None
self._stop_event = threading.Event()
self.current_item = None
self.stats = {
'matched': 0,
'not_found': 0,
'pending': 0,
'errors': 0,
}
self.retry_days = 30
self.name_similarity_threshold = 0.80
logger.info("Amazon background worker initialized")
def _ensure_amazon_schema(self, cursor) -> None:
"""Ensure upgraded installs have the Amazon enrichment columns.
MusicDatabase normally runs this migration during startup, but the
worker should still be defensive because it is the code path that
repeatedly queries these columns in the background.
"""
if self._amazon_schema_checked:
return
table_columns = {
'artists': ('amazon_id', 'amazon_match_status', 'amazon_last_attempted'),
'albums': ('amazon_id', 'amazon_match_status', 'amazon_last_attempted'),
'tracks': ('amazon_id', 'amazon_match_status', 'amazon_last_attempted'),
}
for table, columns in table_columns.items():
cursor.execute(f"PRAGMA table_info({table})")
existing = {row[1] for row in cursor.fetchall()}
for column in columns:
if column not in existing:
column_type = 'TIMESTAMP' if column == 'amazon_last_attempted' else 'TEXT'
cursor.execute(f"ALTER TABLE {table} ADD COLUMN {column} {column_type}")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_artists_amazon_id ON artists (amazon_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_artists_amazon_status ON artists (amazon_match_status)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_albums_amazon_id ON albums (amazon_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_albums_amazon_status ON albums (amazon_match_status)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_tracks_amazon_id ON tracks (amazon_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_tracks_amazon_status ON tracks (amazon_match_status)")
cursor.connection.commit()
self._amazon_schema_checked = True
def start(self):
if self.running:
logger.warning("Worker already running")
return
self.running = True
self.should_stop = False
self._stop_event.clear()
self.thread = threading.Thread(target=self._run, daemon=True)
self.thread.start()
logger.info("Amazon background worker started")
def stop(self):
if not self.running:
return
logger.info("Stopping Amazon worker...")
self.should_stop = True
self.running = False
self._stop_event.set()
if self.thread:
self.thread.join(timeout=1)
logger.info("Amazon worker stopped")
def pause(self):
if not self.running:
logger.warning("Worker not running, cannot pause")
return
self.paused = True
logger.info("Amazon worker paused")
def resume(self):
if not self.running:
logger.warning("Worker not running, start it first")
return
self.paused = False
logger.info("Amazon worker resumed")
def get_stats(self) -> Dict[str, Any]:
self.stats['pending'] = self._count_pending_items()
progress = self._get_progress_breakdown()
is_actually_running = self.running and (self.thread is not None and self.thread.is_alive())
is_idle = is_actually_running and not self.paused and self.stats['pending'] == 0 and self.current_item is None
return {
'enabled': True,
'running': is_actually_running and not self.paused,
'paused': self.paused,
'idle': is_idle,
'current_item': self.current_item,
'stats': self.stats.copy(),
'progress': progress,
}
def _run(self):
logger.info("Amazon worker thread started")
while not self.should_stop:
try:
if self.paused:
interruptible_sleep(self._stop_event, 1)
continue
self.current_item = None
item = self._get_next_item()
if not item:
logger.debug("No pending items, sleeping...")
interruptible_sleep(self._stop_event, 10)
continue
self.current_item = item
item_id = item.get('id') or item.get('artist_id') or item.get('album_id')
if item_id is None:
logger.warning(f"Skipping {item.get('type', 'unknown')} with NULL id: {item.get('name', '?')}")
continue
self._process_item(item)
interruptible_sleep(self._stop_event, 2)
except Exception as e:
logger.error(f"Error in worker loop: {e}")
interruptible_sleep(self._stop_event, 5)
logger.info("Amazon worker thread finished")
def _get_next_item(self) -> Optional[Dict[str, Any]]:
conn = None
try:
conn = self.db._get_connection()
cursor = conn.cursor()
self._ensure_amazon_schema(cursor)
# Priority 1: Unattempted artists
cursor.execute("""
SELECT id, name FROM artists
WHERE amazon_match_status IS NULL AND id IS NOT NULL
ORDER BY id ASC LIMIT 1
""")
row = cursor.fetchone()
if row:
return {'type': 'artist', 'id': row[0], 'name': row[1]}
# Priority 2: Unattempted albums
cursor.execute("""
SELECT a.id, a.title, ar.name AS artist_name, ar.amazon_id AS artist_amazon_id
FROM albums a
JOIN artists ar ON a.artist_id = ar.id
WHERE a.amazon_match_status IS NULL AND a.id IS NOT NULL
ORDER BY a.id ASC LIMIT 1
""")
row = cursor.fetchone()
if row:
return {'type': 'album', 'id': row[0], 'name': row[1], 'artist': row[2], 'artist_amazon_id': row[3]}
# Priority 3: Unattempted tracks
cursor.execute("""
SELECT t.id, t.title, ar.name AS artist_name, ar.amazon_id AS artist_amazon_id
FROM tracks t
JOIN artists ar ON t.artist_id = ar.id
WHERE t.amazon_match_status IS NULL AND t.id IS NOT NULL
ORDER BY t.id ASC LIMIT 1
""")
row = cursor.fetchone()
if row:
return {'type': 'track', 'id': row[0], 'name': row[1], 'artist': row[2], 'artist_amazon_id': row[3]}
not_found_cutoff = datetime.now() - timedelta(days=self.retry_days)
# Priority 4: Retry not_found artists
cursor.execute("""
SELECT id, name FROM artists
WHERE amazon_match_status = 'not_found' AND amazon_last_attempted < ?
ORDER BY amazon_last_attempted ASC LIMIT 1
""", (not_found_cutoff,))
row = cursor.fetchone()
if row:
logger.info(f"Retrying artist '{row[1]}' (last attempted before cutoff)")
return {'type': 'artist', 'id': row[0], 'name': row[1]}
# Priority 5: Retry not_found albums
cursor.execute("""
SELECT a.id, a.title, ar.name AS artist_name, ar.amazon_id AS artist_amazon_id
FROM albums a
JOIN artists ar ON a.artist_id = ar.id
WHERE a.amazon_match_status = 'not_found' AND a.amazon_last_attempted < ?
ORDER BY a.amazon_last_attempted ASC LIMIT 1
""", (not_found_cutoff,))
row = cursor.fetchone()
if row:
return {'type': 'album', 'id': row[0], 'name': row[1], 'artist': row[2], 'artist_amazon_id': row[3]}
# Priority 6: Retry not_found tracks
cursor.execute("""
SELECT t.id, t.title, ar.name AS artist_name, ar.amazon_id AS artist_amazon_id
FROM tracks t
JOIN artists ar ON t.artist_id = ar.id
WHERE t.amazon_match_status = 'not_found' AND t.amazon_last_attempted < ?
ORDER BY t.amazon_last_attempted ASC LIMIT 1
""", (not_found_cutoff,))
row = cursor.fetchone()
if row:
return {'type': 'track', 'id': row[0], 'name': row[1], 'artist': row[2], 'artist_amazon_id': row[3]}
return None
except Exception as e:
logger.error(f"Error getting next item: {e}")
return None
finally:
if conn:
conn.close()
def _normalize_name(self, name: str) -> str:
name = name.lower().strip()
name = re.sub(r'\s+[-–—]\s+.*$', '', name)
name = re.sub(r'\s*\(.*?\)\s*', ' ', name)
name = re.sub(r'[^\w\s]', '', name)
name = re.sub(r'\s+', ' ', name).strip()
return name
def _name_matches(self, query_name: str, result_name: str) -> bool:
norm_query = self._normalize_name(query_name)
norm_result = self._normalize_name(result_name)
similarity = SequenceMatcher(None, norm_query, norm_result).ratio()
logger.debug(f"Name similarity: '{query_name}' vs '{result_name}' = {similarity:.2f}")
return similarity >= self.name_similarity_threshold
def _process_item(self, item: Dict[str, Any]):
try:
item_type = item['type']
item_id = item['id']
item_name = item['name']
logger.debug(f"Processing {item_type} #{item_id}: {item_name}")
if item_type == 'artist':
self._process_artist(item_id, item_name)
elif item_type == 'album':
self._process_album(item_id, item_name, item.get('artist', ''), item)
elif item_type == 'track':
self._process_track(item_id, item_name, item.get('artist', ''), item)
except Exception as e:
logger.error(f"Error processing {item['type']} #{item['id']}: {e}")
self.stats['errors'] += 1
try:
self._mark_status(item['type'], item['id'], 'error')
except Exception as e2:
logger.error(f"Error updating item status: {e2}")
def _get_existing_id(self, entity_type: str, entity_id: int) -> Optional[str]:
table_map = {'artist': 'artists', 'album': 'albums', 'track': 'tracks'}
table = table_map.get(entity_type)
if not table:
return None
conn = None
try:
conn = self.db._get_connection()
cursor = conn.cursor()
cursor.execute(f"SELECT amazon_id FROM {table} WHERE id = ?", (entity_id,))
row = cursor.fetchone()
return row[0] if row and row[0] else None
except Exception:
return None
finally:
if conn:
conn.close()
def _process_artist(self, artist_id: int, artist_name: str):
existing_id = self._get_existing_id('artist', artist_id)
if existing_id:
logger.debug(f"Preserving existing Amazon ID for artist '{artist_name}': {existing_id}")
return
results = self.client.search_artists(artist_name, limit=5)
if results:
result = results[0]
if self._name_matches(artist_name, result.name):
self._update_artist(artist_id, result)
self.stats['matched'] += 1
logger.info(f"Matched artist '{artist_name}' -> Amazon ID: {result.id}")
else:
self._mark_status('artist', artist_id, 'not_found')
self.stats['not_found'] += 1
logger.debug(f"Name mismatch for artist '{artist_name}' (got '{result.name}')")
else:
self._mark_status('artist', artist_id, 'not_found')
self.stats['not_found'] += 1
logger.debug(f"No match for artist '{artist_name}'")
def _refresh_album_via_stored_id(self, album_id, stored_id, api_data):
self._update_album(album_id, api_data, stored_id)
def _refresh_track_via_stored_id(self, track_id, stored_id, api_data):
self._update_track(track_id, api_data, stored_id)
def _process_album(self, album_id: int, album_name: str, artist_name: str, item: Dict[str, Any]):
if honor_stored_match(
db=self.db, entity_table='albums', entity_id=album_id,
id_column='amazon_id',
client_fetch_fn=lambda asin: self.client.get_album(asin, include_tracks=False),
on_match_fn=self._refresh_album_via_stored_id,
log_prefix='Amazon',
):
self.stats['matched'] += 1
return
query = f"{artist_name} {album_name}"
results = self.client.search_albums(query, limit=10)
if results:
result = results[0]
if self._name_matches(album_name, result.name):
full_album = None
if result.id:
try:
full_album = self.client.get_album(result.id, include_tracks=False)
except Exception as e:
logger.warning(f"Failed to fetch full album '{album_name}' (ASIN: {result.id}): {e}")
if full_album is None:
self._mark_status('album', album_id, 'error')
self.stats['errors'] += 1
logger.warning(f"Album '{album_name}' matched but full details unavailable, will retry")
return
self._update_album(album_id, full_album, result.id)
self.stats['matched'] += 1
logger.info(f"Matched album '{album_name}' -> Amazon ASIN: {result.id}")
else:
self._mark_status('album', album_id, 'not_found')
self.stats['not_found'] += 1
logger.debug(f"Name mismatch for album '{album_name}' (got '{result.name}')")
else:
self._mark_status('album', album_id, 'not_found')
self.stats['not_found'] += 1
logger.debug(f"No match for album '{album_name}'")
def _process_track(self, track_id: int, track_name: str, artist_name: str, item: Dict[str, Any]):
if honor_stored_match(
db=self.db, entity_table='tracks', entity_id=track_id,
id_column='amazon_id',
client_fetch_fn=self.client.get_track_details,
on_match_fn=self._refresh_track_via_stored_id,
log_prefix='Amazon',
):
self.stats['matched'] += 1
return
query = f"{artist_name} {track_name}"
results = self.client.search_tracks(query, limit=10)
if results:
result = results[0]
if self._name_matches(track_name, result.name):
full_track = None
if result.id:
try:
full_track = self.client.get_track_details(result.id)
except Exception as e:
logger.warning(f"Failed to fetch full track '{track_name}' (ASIN: {result.id}): {e}")
if full_track is None:
self._mark_status('track', track_id, 'error')
self.stats['errors'] += 1
logger.warning(f"Track '{track_name}' matched but full details unavailable, will retry")
return
self._update_track(track_id, full_track, result.id)
self.stats['matched'] += 1
logger.info(f"Matched track '{track_name}' -> Amazon ASIN: {result.id}")
else:
self._mark_status('track', track_id, 'not_found')
self.stats['not_found'] += 1
logger.debug(f"Name mismatch for track '{track_name}' (got '{result.name}')")
else:
self._mark_status('track', track_id, 'not_found')
self.stats['not_found'] += 1
logger.debug(f"No match for track '{track_name}'")
def _update_artist(self, artist_id: int, result):
"""Store Amazon metadata for an artist. ``result`` is an Artist dataclass."""
conn = None
try:
conn = self.db._get_connection()
cursor = conn.cursor()
cursor.execute("""
UPDATE artists SET
amazon_id = ?,
amazon_match_status = 'matched',
amazon_last_attempted = CURRENT_TIMESTAMP,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (str(result.id), artist_id))
# Backfill thumb_url from album cover stand-in when artist has no image
image_url = result.image_url
if not image_url:
try:
image_url = self.client._get_artist_image_from_albums(result.id)
except Exception as exc:
logger.debug("Artist image from albums failed for %s: %s", result.id, exc)
if image_url:
cursor.execute("""
UPDATE artists SET thumb_url = ?
WHERE id = ? AND (thumb_url IS NULL OR thumb_url = '')
""", (image_url, artist_id))
conn.commit()
except Exception as e:
logger.error(f"Error updating artist #{artist_id} with Amazon data: {e}")
raise
finally:
if conn:
conn.close()
def _update_album(self, album_id: int, full_data: Dict[str, Any], asin: str):
"""Store Amazon metadata for an album. ``full_data`` is a get_album() dict."""
conn = None
try:
conn = self.db._get_connection()
cursor = conn.cursor()
cursor.execute("""
UPDATE albums SET
amazon_id = ?,
amazon_match_status = 'matched',
amazon_last_attempted = CURRENT_TIMESTAMP,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (asin, album_id))
# Backfill label when missing
label = full_data.get('label')
if label:
cursor.execute("""
UPDATE albums SET label = ?
WHERE id = ? AND (label IS NULL OR label = '')
""", (label, album_id))
# Backfill thumb_url
images = full_data.get('images') or []
thumb_url = images[0].get('url') if images else None
if thumb_url:
cursor.execute("""
UPDATE albums SET thumb_url = ?
WHERE id = ? AND (thumb_url IS NULL OR thumb_url = '')
""", (thumb_url, album_id))
# Cache authoritative track count for completeness repair
total_tracks = full_data.get('total_tracks') or (
full_data.get('tracks', {}).get('total') if isinstance(full_data.get('tracks'), dict) else None
)
set_album_api_track_count(cursor, album_id, total_tracks)
conn.commit()
except Exception as e:
logger.error(f"Error updating album #{album_id} with Amazon data: {e}")
raise
finally:
if conn:
conn.close()
def _update_track(self, track_id: int, full_data: Dict[str, Any], asin: str):
"""Store Amazon metadata for a track. ``full_data`` is a get_track_details() dict."""
conn = None
try:
conn = self.db._get_connection()
cursor = conn.cursor()
cursor.execute("""
UPDATE tracks SET
amazon_id = ?,
amazon_match_status = 'matched',
amazon_last_attempted = CURRENT_TIMESTAMP
WHERE id = ?
""", (asin, track_id))
conn.commit()
except Exception as e:
logger.error(f"Error updating track #{track_id} with Amazon data: {e}")
raise
finally:
if conn:
conn.close()
def _mark_status(self, entity_type: str, entity_id: int, status: str):
table_map = {'artist': 'artists', 'album': 'albums', 'track': 'tracks'}
table = table_map.get(entity_type)
if not table:
logger.error(f"Unknown entity type: {entity_type}")
return
conn = None
try:
conn = self.db._get_connection()
cursor = conn.cursor()
cursor.execute(f"""
UPDATE {table} SET
amazon_match_status = ?,
amazon_last_attempted = CURRENT_TIMESTAMP,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (status, entity_id))
conn.commit()
except Exception as e:
logger.error(f"Error marking {entity_type} #{entity_id} status: {e}")
finally:
if conn:
conn.close()
def _count_pending_items(self) -> int:
conn = None
try:
conn = self.db._get_connection()
cursor = conn.cursor()
self._ensure_amazon_schema(cursor)
cursor.execute("""
SELECT
(SELECT COUNT(*) FROM artists WHERE amazon_match_status IS NULL AND id IS NOT NULL) +
(SELECT COUNT(*) FROM albums WHERE amazon_match_status IS NULL AND id IS NOT NULL) +
(SELECT COUNT(*) FROM tracks WHERE amazon_match_status IS NULL AND id IS NOT NULL)
AS pending
""")
row = cursor.fetchone()
return row[0] if row else 0
except Exception as e:
logger.error(f"Error counting pending items: {e}")
return 0
finally:
if conn:
conn.close()
def _get_progress_breakdown(self) -> Dict[str, Dict[str, int]]:
conn = None
try:
conn = self.db._get_connection()
cursor = conn.cursor()
self._ensure_amazon_schema(cursor)
progress = {}
for table in ('artists', 'albums', 'tracks'):
cursor.execute(f"""
SELECT
COUNT(*) AS total,
SUM(CASE WHEN amazon_match_status IS NOT NULL THEN 1 ELSE 0 END) AS processed
FROM {table}
""")
row = cursor.fetchone()
if row:
total, processed = row[0], row[1] or 0
progress[table] = {
'matched': processed,
'total': total,
'percent': int((processed / total * 100) if total > 0 else 0),
}
return progress
except Exception as e:
logger.error(f"Error getting progress breakdown: {e}")
return {}
finally:
if conn:
conn.close()