import json import re import threading import time from difflib import SequenceMatcher from types import SimpleNamespace from typing import Optional, Dict, Any, List from datetime import datetime, timedelta from utils.logging_config import get_logger from database.music_database import MusicDatabase from core.itunes_client import iTunesClient from core.worker_utils import interruptible_sleep, set_album_api_track_count from core.enrichment.manual_match_honoring import honor_stored_match logger = get_logger("itunes_worker") class iTunesWorker: """Background worker for enriching library artists, albums, and tracks with iTunes metadata. Uses the same smart cascading batch approach as SpotifyWorker: 1. Search artist by name (1 API call) 2. get_artist_albums once per matched artist -> match all DB albums locally 3. get_album_tracks once per matched album -> match all DB tracks locally 4. Fallback individual search for items whose parent wasn't matched iTunes _lookup() calls are NOT rate-limited, so batch operations are fast. Only _search() calls are rate-limited (~20/min, 3s between calls). """ def __init__(self, database: MusicDatabase): self.db = database self.client = iTunesClient() # Worker state self.running = False self.paused = False self.should_stop = False self.thread = None self._stop_event = threading.Event() # Current item being processed (for UI tooltip) self.current_item = None # Statistics self.stats = { 'matched': 0, 'not_found': 0, 'pending': 0, 'errors': 0 } # Retry configuration self.retry_days = 30 self.error_retry_days = 7 # Name matching threshold self.name_similarity_threshold = 0.80 # Rate limiting — iTunes search is ~20 calls/min (3s enforced by client), # but we add extra sleep between top-level items. Lookup is NOT rate-limited. self.inter_item_sleep = 3.5 # Between search items (artist/individual) self.batch_inter_item_sleep = 0.1 # Between local matches within a batch (lookup, not rate-limited) logger.info("iTunes background worker initialized") def start(self): if self.running: logger.warning("Worker already running") return self.running = True self.should_stop = False self._stop_event.clear() self.thread = threading.Thread(target=self._run, daemon=True) self.thread.start() logger.info("iTunes background worker started") def stop(self): if not self.running: return logger.info("Stopping iTunes worker...") self.should_stop = True self.running = False self._stop_event.set() if self.thread: self.thread.join(timeout=1) logger.info("iTunes worker stopped") def pause(self): if not self.running: logger.warning("Worker not running, cannot pause") return self.paused = True logger.info("iTunes worker paused") def resume(self): if not self.running: logger.warning("Worker not running, start it first") return self.paused = False logger.info("iTunes worker resumed") def get_stats(self) -> Dict[str, Any]: self.stats['pending'] = self._count_pending_items() progress = self._get_progress_breakdown() is_actually_running = self.running and (self.thread is not None and self.thread.is_alive()) is_idle = is_actually_running and not self.paused and self.stats['pending'] == 0 and self.current_item is None return { 'enabled': True, 'running': is_actually_running and not self.paused, 'paused': self.paused, 'idle': is_idle, 'current_item': self.current_item, 'stats': self.stats.copy(), 'progress': progress } # ── Main loop ────────────────────────────────────────────────────── def _run(self): logger.info("iTunes worker thread started") while not self.should_stop: try: if self.paused: interruptible_sleep(self._stop_event, 1) continue # No auth check needed — iTunes API requires no authentication self.current_item = None item = self._get_next_item() if not item: logger.debug("No pending items, sleeping...") interruptible_sleep(self._stop_event, 10) continue self.current_item = item # Guard: skip items with None/NULL IDs to prevent infinite enrichment loops item_id = item.get('id') or item.get('artist_id') or item.get('album_id') if item_id is None: logger.warning(f"Skipping {item.get('type', 'unknown')} with NULL id: {item.get('name', '?')} — marking as error") try: itype = item.get('type', '') table = 'artists' if 'artist' in itype else ('albums' if 'album' in itype else 'tracks') # Can't mark status without an ID — just skip except Exception as e: logger.debug("null id table resolve failed: %s", e) continue self._process_item(item) # Sleep depends on item type — search items need more delay item_type = item.get('type', '') if item_type in ('album_batch', 'track_batch'): interruptible_sleep(self._stop_event, self.batch_inter_item_sleep) else: interruptible_sleep(self._stop_event, self.inter_item_sleep) except Exception as e: logger.error(f"Error in worker loop: {e}") interruptible_sleep(self._stop_event, 5) self.current_item = None logger.info("iTunes worker thread finished") # ── Priority queue ───────────────────────────────────────────────── def _get_next_item(self) -> Optional[Dict[str, Any]]: conn = None try: conn = self.db._get_connection() cursor = conn.cursor() # Priority 1: Unattempted artists cursor.execute(""" SELECT id, name FROM artists WHERE itunes_match_status IS NULL AND id IS NOT NULL ORDER BY id ASC LIMIT 1 """) row = cursor.fetchone() if row: return {'type': 'artist', 'id': row[0], 'name': row[1]} # Priority 2: Album batch — matched artist with unattempted albums cursor.execute(""" SELECT ar.id, ar.name, ar.itunes_artist_id FROM artists ar WHERE ar.itunes_match_status = 'matched' AND ar.itunes_artist_id IS NOT NULL AND EXISTS ( SELECT 1 FROM albums al WHERE al.artist_id = ar.id AND al.itunes_match_status IS NULL AND al.id IS NOT NULL ) ORDER BY ar.id ASC LIMIT 1 """) row = cursor.fetchone() if row: return { 'type': 'album_batch', 'artist_id': row[0], 'artist_name': row[1], 'itunes_artist_id': row[2], 'name': f"Albums for {row[1]}" } # Priority 3: Track batch — matched album with unattempted tracks cursor.execute(""" SELECT al.id, al.title, al.itunes_album_id, ar.name AS artist_name FROM albums al JOIN artists ar ON al.artist_id = ar.id WHERE al.itunes_match_status = 'matched' AND al.itunes_album_id IS NOT NULL AND EXISTS ( SELECT 1 FROM tracks t WHERE t.album_id = al.id AND t.itunes_match_status IS NULL AND t.id IS NOT NULL ) ORDER BY al.id ASC LIMIT 1 """) row = cursor.fetchone() if row: return { 'type': 'track_batch', 'album_id': row[0], 'album_name': row[1], 'itunes_album_id': row[2], 'artist_name': row[3], 'name': f"Tracks on {row[1]}" } # Priority 4: Fallback individual albums (parent artist unmatched) cursor.execute(""" SELECT a.id, a.title, ar.name AS artist_name FROM albums a JOIN artists ar ON a.artist_id = ar.id WHERE a.itunes_match_status IS NULL AND a.id IS NOT NULL ORDER BY a.id ASC LIMIT 1 """) row = cursor.fetchone() if row: return {'type': 'album_individual', 'id': row[0], 'name': row[1], 'artist': row[2]} # Priority 5: Fallback individual tracks (parent album unmatched) cursor.execute(""" SELECT t.id, t.title, ar.name AS artist_name FROM tracks t JOIN artists ar ON t.artist_id = ar.id WHERE t.itunes_match_status IS NULL AND t.id IS NOT NULL ORDER BY t.id ASC LIMIT 1 """) row = cursor.fetchone() if row: return {'type': 'track_individual', 'id': row[0], 'name': row[1], 'artist': row[2]} # Priority 6: Retry stale not_found items only (errors don't auto-retry — # they require a user-triggered full refresh to prevent infinite retry loops) not_found_cutoff = datetime.now() - timedelta(days=self.retry_days) cursor.execute(""" SELECT id, name FROM artists WHERE itunes_match_status = 'not_found' AND itunes_last_attempted < ? ORDER BY itunes_last_attempted ASC LIMIT 1 """, (not_found_cutoff,)) row = cursor.fetchone() if row: return {'type': 'artist', 'id': row[0], 'name': row[1]} cursor.execute(""" SELECT a.id, a.title, ar.name AS artist_name FROM albums a JOIN artists ar ON a.artist_id = ar.id WHERE a.itunes_match_status = 'not_found' AND a.itunes_last_attempted < ? ORDER BY a.itunes_last_attempted ASC LIMIT 1 """, (not_found_cutoff,)) row = cursor.fetchone() if row: return {'type': 'album_individual', 'id': row[0], 'name': row[1], 'artist': row[2]} cursor.execute(""" SELECT t.id, t.title, ar.name AS artist_name FROM tracks t JOIN artists ar ON t.artist_id = ar.id WHERE t.itunes_match_status = 'not_found' AND t.itunes_last_attempted < ? ORDER BY t.itunes_last_attempted ASC LIMIT 1 """, (not_found_cutoff,)) row = cursor.fetchone() if row: return {'type': 'track_individual', 'id': row[0], 'name': row[1], 'artist': row[2]} return None except Exception as e: logger.error(f"Error getting next item: {e}") return None finally: if conn: conn.close() # ── Dispatcher ───────────────────────────────────────────────────── def _process_item(self, item: Dict[str, Any]): try: item_type = item['type'] logger.debug(f"Processing {item_type}: {item.get('name', '')}") if item_type == 'artist': self._process_artist(item) elif item_type == 'album_batch': self._process_album_batch(item) elif item_type == 'track_batch': self._process_track_batch(item) elif item_type == 'album_individual': self._process_album_individual(item) elif item_type == 'track_individual': self._process_track_individual(item) except Exception as e: logger.error(f"Error processing {item.get('type')} '{item.get('name', '')}': {e}") self.stats['errors'] += 1 try: itype = item.get('type', '') if itype == 'artist': self._mark_status('artist', item['id'], 'error') elif itype == 'album_individual': self._mark_status('album', item['id'], 'error') elif itype == 'track_individual': self._mark_status('track', item['id'], 'error') elif itype == 'album_batch': self._mark_artist_albums_error(item['artist_id']) elif itype == 'track_batch': self._mark_album_tracks_error(item['album_id']) except Exception as e2: logger.error(f"Error updating item status: {e2}") # ── Artist processing ────────────────────────────────────────────── def _get_existing_id(self, entity_type: str, entity_id: int) -> Optional[str]: """Check if an entity already has an itunes_artist_id/itunes_album_id/itunes_track_id.""" col_map = {'artist': 'itunes_artist_id', 'album': 'itunes_album_id', 'track': 'itunes_track_id'} table_map = {'artist': 'artists', 'album': 'albums', 'track': 'tracks'} col = col_map.get(entity_type) table = table_map.get(entity_type) if not col or not table: return None conn = None try: conn = self.db._get_connection() cursor = conn.cursor() cursor.execute(f"SELECT {col} FROM {table} WHERE id = ?", (entity_id,)) row = cursor.fetchone() return row[0] if row and row[0] else None except Exception: return None finally: if conn: conn.close() def _process_artist(self, item: Dict[str, Any]): artist_id = item['id'] artist_name = item['name'] existing_id = self._get_existing_id('artist', artist_id) if existing_id: logger.debug(f"Preserving existing iTunes ID for artist '{artist_name}': {existing_id}") return results = self.client.search_artists(artist_name, limit=5) if not results: self._mark_status('artist', artist_id, 'not_found') self.stats['not_found'] += 1 logger.debug(f"No iTunes results for artist '{artist_name}'") return for artist_obj in results: if self._name_matches(artist_name, artist_obj.name): if not self._is_itunes_id(artist_obj.id): logger.warning(f"Rejecting non-iTunes ID '{artist_obj.id}' for artist '{artist_name}'") self._mark_status('artist', artist_id, 'error') self.stats['errors'] += 1 return self._update_artist(artist_id, artist_obj) self.stats['matched'] += 1 logger.info(f"Matched artist '{artist_name}' -> iTunes ID: {artist_obj.id}") return self._mark_status('artist', artist_id, 'not_found') self.stats['not_found'] += 1 logger.debug(f"Name mismatch for artist '{artist_name}' (best: '{results[0].name}')") # ── Album batch processing ───────────────────────────────────────── def _process_album_batch(self, item: Dict[str, Any]): artist_id = item['artist_id'] itunes_artist_id = item['itunes_artist_id'] artist_name = item['artist_name'] # 1 lookup call (NOT rate-limited): get all albums for this artist try: itunes_albums = self.client.get_artist_albums( itunes_artist_id, album_type='album,single', limit=50 ) except Exception as e: logger.error(f"Failed to get iTunes albums for artist '{artist_name}': {e}") self._mark_artist_albums_error(artist_id) self.stats['errors'] += 1 return if not itunes_albums: logger.debug(f"No iTunes albums for artist '{artist_name}'") self._mark_artist_albums_not_found(artist_id) return # Validate that we got iTunes albums, not some other format if itunes_albums and not self._is_itunes_id(itunes_albums[0].id): logger.warning(f"Rejecting album batch for '{artist_name}': got non-iTunes IDs") self._mark_artist_albums_error(artist_id) self.stats['errors'] += 1 return db_albums = self._get_unmatched_albums_for_artist(artist_id) if not db_albums: return matched_count = 0 for db_album in db_albums: db_id, db_title = db_album['id'], db_album['title'] best_match = None for it_album in itunes_albums: if self._name_matches(db_title, it_album.name): best_match = it_album break if best_match: self._update_album(db_id, best_match) self.stats['matched'] += 1 matched_count += 1 logger.info(f"Batch matched album '{db_title}' -> iTunes ID: {best_match.id}") else: self._mark_status('album', db_id, 'not_found') self.stats['not_found'] += 1 interruptible_sleep(self._stop_event, self.batch_inter_item_sleep) logger.info(f"Album batch for '{artist_name}': {matched_count}/{len(db_albums)} matched") # ── Track batch processing ───────────────────────────────────────── def _process_track_batch(self, item: Dict[str, Any]): album_id = item['album_id'] itunes_album_id = item['itunes_album_id'] album_name = item['album_name'] # 1 lookup call (NOT rate-limited): get all tracks for this album try: result = self.client.get_album_tracks(itunes_album_id) except Exception as e: logger.error(f"Failed to get iTunes tracks for album '{album_name}': {e}") self._mark_album_tracks_error(album_id) self.stats['errors'] += 1 return if not result or not result.get('items'): logger.debug(f"No iTunes tracks for album '{album_name}'") self._mark_album_tracks_not_found(album_id) return itunes_tracks = result['items'] # Validate that we got iTunes tracks if itunes_tracks and not self._is_itunes_id(str(itunes_tracks[0].get('id', ''))): logger.warning(f"Rejecting track batch for '{album_name}': got non-iTunes IDs") self._mark_album_tracks_error(album_id) self.stats['errors'] += 1 return db_tracks = self._get_unmatched_tracks_for_album(album_id) if not db_tracks: return matched_count = 0 for db_track in db_tracks: db_id = db_track['id'] db_title = db_track['title'] db_track_number = db_track.get('track_number') best_match = None # Strategy A: track_number match + name verification if db_track_number: for it_track in itunes_tracks: it_num = it_track.get('track_number') if it_num and it_num == db_track_number: it_name = it_track.get('name', '') if self._name_matches(db_title, it_name): best_match = it_track break # Strategy B: pure name match fallback if not best_match: for it_track in itunes_tracks: it_name = it_track.get('name', '') if self._name_matches(db_title, it_name): best_match = it_track break if best_match: self._update_track(db_id, best_match) self.stats['matched'] += 1 matched_count += 1 logger.info(f"Batch matched track '{db_title}' -> iTunes ID: {best_match.get('id')}") else: self._mark_status('track', db_id, 'not_found') self.stats['not_found'] += 1 interruptible_sleep(self._stop_event, self.batch_inter_item_sleep) logger.info(f"Track batch for '{album_name}': {matched_count}/{len(db_tracks)} matched") # ── Individual fallback processing ───────────────────────────────── def _refresh_album_via_stored_id(self, album_id, stored_id, api_album_dict): """Issue #501 callback. Convert ``client.get_album()`` dict into the Album-shaped object ``_update_album`` expects, then call it. Preserves the manual match — never overwrites the stored ID with a different name-search result.""" images = api_album_dict.get('images') or [] image_url = '' if images and isinstance(images[0], dict): image_url = images[0].get('url', '') or '' adapter = SimpleNamespace( id=api_album_dict.get('id') or stored_id, name=api_album_dict.get('name', ''), image_url=image_url, album_type=api_album_dict.get('album_type', 'album'), release_date=api_album_dict.get('release_date', ''), total_tracks=api_album_dict.get('total_tracks', 0), ) self._update_album(album_id, adapter) def _refresh_track_via_stored_id(self, track_id, stored_id, api_track_dict): """Track-level callback — track update only writes ID + status, no metadata backfill, so the dict shape is irrelevant beyond carrying the stored ID through.""" adapter = SimpleNamespace(id=api_track_dict.get('id') or stored_id) self._update_track_from_search(track_id, adapter) def _process_album_individual(self, item: Dict[str, Any]): album_id = item['id'] album_name = item['name'] artist_name = item.get('artist', '') # Issue #501: honor manual matches (see SpotifyWorker for full # explanation — same pattern across every per-source worker). if honor_stored_match( db=self.db, entity_table='albums', entity_id=album_id, id_column='itunes_album_id', client_fetch_fn=self.client.get_album, on_match_fn=self._refresh_album_via_stored_id, log_prefix='iTunes', ): self.stats['matched'] += 1 return query = f"{artist_name} {album_name}" if artist_name else album_name results = self.client.search_albums(query, limit=5) if not results: self._mark_status('album', album_id, 'not_found') self.stats['not_found'] += 1 logger.debug(f"No iTunes results for album '{album_name}'") return for album_obj in results: if self._name_matches(album_name, album_obj.name): if not self._is_itunes_id(album_obj.id): logger.warning(f"Rejecting non-iTunes ID '{album_obj.id}' for album '{album_name}'") self._mark_status('album', album_id, 'error') self.stats['errors'] += 1 return self._update_album(album_id, album_obj) self.stats['matched'] += 1 logger.info(f"Matched album '{album_name}' -> iTunes ID: {album_obj.id}") return self._mark_status('album', album_id, 'not_found') self.stats['not_found'] += 1 logger.debug(f"Name mismatch for album '{album_name}'") def _process_track_individual(self, item: Dict[str, Any]): track_id = item['id'] track_name = item['name'] artist_name = item.get('artist', '') # Issue #501: honor manual matches. if honor_stored_match( db=self.db, entity_table='tracks', entity_id=track_id, id_column='itunes_track_id', client_fetch_fn=self.client.get_track_details, on_match_fn=self._refresh_track_via_stored_id, log_prefix='iTunes', ): self.stats['matched'] += 1 return query = f"{artist_name} {track_name}" if artist_name else track_name results = self.client.search_tracks(query, limit=5) if not results: self._mark_status('track', track_id, 'not_found') self.stats['not_found'] += 1 logger.debug(f"No iTunes results for track '{track_name}'") return for track_obj in results: if self._name_matches(track_name, track_obj.name): if not self._is_itunes_id(track_obj.id): logger.warning(f"Rejecting non-iTunes ID '{track_obj.id}' for track '{track_name}'") self._mark_status('track', track_id, 'error') self.stats['errors'] += 1 return self._update_track_from_search(track_id, track_obj) self.stats['matched'] += 1 logger.info(f"Matched track '{track_name}' -> iTunes ID: {track_obj.id}") return self._mark_status('track', track_id, 'not_found') self.stats['not_found'] += 1 logger.debug(f"Name mismatch for track '{track_name}'") # ── DB update methods ────────────────────────────────────────────── def _update_artist(self, artist_id: int, artist_obj): """Store iTunes metadata for an artist (from Artist dataclass)""" conn = None try: conn = self.db._get_connection() cursor = conn.cursor() cursor.execute(""" UPDATE artists SET itunes_artist_id = ?, itunes_match_status = 'matched', itunes_last_attempted = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (str(artist_obj.id), artist_id)) # Backfill thumb_url if empty if artist_obj.image_url: cursor.execute(""" UPDATE artists SET thumb_url = ? WHERE id = ? AND (thumb_url IS NULL OR thumb_url = '') """, (artist_obj.image_url, artist_id)) # Backfill genres if empty if artist_obj.genres: from core.genre_filter import filter_genres from config.settings import config_manager as _cfg _filtered = filter_genres(list(artist_obj.genres), _cfg) if _filtered: cursor.execute(""" UPDATE artists SET genres = ? WHERE id = ? AND (genres IS NULL OR genres = '' OR genres = '[]') """, (json.dumps(_filtered), artist_id)) conn.commit() except Exception as e: logger.error(f"Error updating artist #{artist_id} with iTunes data: {e}") raise finally: if conn: conn.close() def _update_album(self, album_id: int, album_obj): """Store iTunes metadata for an album (from Album dataclass)""" conn = None try: conn = self.db._get_connection() cursor = conn.cursor() cursor.execute(""" UPDATE albums SET itunes_album_id = ?, itunes_match_status = 'matched', itunes_last_attempted = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (str(album_obj.id), album_id)) # Backfill thumb_url if empty if album_obj.image_url: cursor.execute(""" UPDATE albums SET thumb_url = ? WHERE id = ? AND (thumb_url IS NULL OR thumb_url = '') """, (album_obj.image_url, album_id)) # Backfill record_type if empty if album_obj.album_type: cursor.execute(""" UPDATE albums SET record_type = ? WHERE id = ? AND (record_type IS NULL OR record_type = '') """, (album_obj.album_type, album_id)) # Backfill year from release_date if empty if album_obj.release_date: year = album_obj.release_date[:4] if len(album_obj.release_date) >= 4 else None if year and year.isdigit(): cursor.execute(""" UPDATE albums SET year = ? WHERE id = ? AND (year IS NULL OR year = '' OR year = '0') """, (year, album_id)) # Cache the authoritative expected track count for the Album # Completeness repair job (see set_album_api_track_count docstring). set_album_api_track_count(cursor, album_id, getattr(album_obj, 'total_tracks', 0)) conn.commit() except Exception as e: logger.error(f"Error updating album #{album_id} with iTunes data: {e}") raise finally: if conn: conn.close() def _update_track(self, track_id: int, track_data: Dict[str, Any]): """Store iTunes metadata for a track (from get_album_tracks dict)""" conn = None try: conn = self.db._get_connection() cursor = conn.cursor() itunes_id = str(track_data.get('id', '')) cursor.execute(""" UPDATE tracks SET itunes_track_id = ?, itunes_match_status = 'matched', itunes_last_attempted = CURRENT_TIMESTAMP WHERE id = ? """, (itunes_id, track_id)) # Backfill explicit flag if 'explicit' in track_data: explicit_val = 1 if track_data['explicit'] else 0 cursor.execute(""" UPDATE tracks SET explicit = ? WHERE id = ? AND explicit IS NULL """, (explicit_val, track_id)) conn.commit() except Exception as e: logger.error(f"Error updating track #{track_id} with iTunes data: {e}") raise finally: if conn: conn.close() def _update_track_from_search(self, track_id: int, track_obj): """Store iTunes metadata for a track (from Track dataclass, individual search)""" conn = None try: conn = self.db._get_connection() cursor = conn.cursor() cursor.execute(""" UPDATE tracks SET itunes_track_id = ?, itunes_match_status = 'matched', itunes_last_attempted = CURRENT_TIMESTAMP WHERE id = ? """, (str(track_obj.id), track_id)) conn.commit() except Exception as e: logger.error(f"Error updating track #{track_id} with iTunes data: {e}") raise finally: if conn: conn.close() # ── Batch helpers ────────────────────────────────────────────────── def _get_unmatched_albums_for_artist(self, artist_id: int) -> List[Dict[str, Any]]: conn = None try: conn = self.db._get_connection() cursor = conn.cursor() cursor.execute(""" SELECT id, title FROM albums WHERE artist_id = ? AND itunes_match_status IS NULL ORDER BY id ASC """, (artist_id,)) return [{'id': row[0], 'title': row[1]} for row in cursor.fetchall()] except Exception as e: logger.error(f"Error getting unmatched albums for artist #{artist_id}: {e}") return [] finally: if conn: conn.close() def _get_unmatched_tracks_for_album(self, album_id: int) -> List[Dict[str, Any]]: conn = None try: conn = self.db._get_connection() cursor = conn.cursor() cursor.execute(""" SELECT id, title, track_number FROM tracks WHERE album_id = ? AND itunes_match_status IS NULL AND id IS NOT NULL ORDER BY id ASC """, (album_id,)) return [{'id': row[0], 'title': row[1], 'track_number': row[2]} for row in cursor.fetchall()] except Exception as e: logger.error(f"Error getting unmatched tracks for album #{album_id}: {e}") return [] finally: if conn: conn.close() def _mark_artist_albums_error(self, artist_id: int): conn = None try: conn = self.db._get_connection() cursor = conn.cursor() cursor.execute(""" UPDATE albums SET itunes_match_status = 'error', itunes_last_attempted = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE artist_id = ? AND itunes_match_status IS NULL """, (artist_id,)) conn.commit() except Exception as e: logger.error(f"Error bulk-marking albums for artist #{artist_id}: {e}") finally: if conn: conn.close() def _mark_artist_albums_not_found(self, artist_id: int): conn = None try: conn = self.db._get_connection() cursor = conn.cursor() cursor.execute(""" UPDATE albums SET itunes_match_status = 'not_found', itunes_last_attempted = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE artist_id = ? AND itunes_match_status IS NULL """, (artist_id,)) conn.commit() except Exception as e: logger.error(f"Error bulk-marking albums not_found for artist #{artist_id}: {e}") finally: if conn: conn.close() def _mark_album_tracks_error(self, album_id: int): conn = None try: conn = self.db._get_connection() cursor = conn.cursor() cursor.execute(""" UPDATE tracks SET itunes_match_status = 'error', itunes_last_attempted = CURRENT_TIMESTAMP WHERE album_id = ? AND itunes_match_status IS NULL """, (album_id,)) conn.commit() except Exception as e: logger.error(f"Error bulk-marking tracks for album #{album_id}: {e}") finally: if conn: conn.close() def _mark_album_tracks_not_found(self, album_id: int): conn = None try: conn = self.db._get_connection() cursor = conn.cursor() cursor.execute(""" UPDATE tracks SET itunes_match_status = 'not_found', itunes_last_attempted = CURRENT_TIMESTAMP WHERE album_id = ? AND itunes_match_status IS NULL """, (album_id,)) conn.commit() except Exception as e: logger.error(f"Error bulk-marking tracks not_found for album #{album_id}: {e}") finally: if conn: conn.close() # ── Status / counting ────────────────────────────────────────────── def _mark_status(self, entity_type: str, entity_id: int, status: str): table_map = {'artist': 'artists', 'album': 'albums', 'track': 'tracks'} table = table_map.get(entity_type) if not table: return conn = None try: conn = self.db._get_connection() cursor = conn.cursor() cursor.execute(f""" UPDATE {table} SET itunes_match_status = ?, itunes_last_attempted = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (status, entity_id)) conn.commit() except Exception as e: logger.error(f"Error marking {entity_type} #{entity_id} status: {e}") finally: if conn: conn.close() def _count_pending_items(self) -> int: conn = None try: conn = self.db._get_connection() cursor = conn.cursor() cursor.execute(""" SELECT (SELECT COUNT(*) FROM artists WHERE itunes_match_status IS NULL AND id IS NOT NULL) + (SELECT COUNT(*) FROM albums WHERE itunes_match_status IS NULL AND id IS NOT NULL) + (SELECT COUNT(*) FROM tracks WHERE itunes_match_status IS NULL AND id IS NOT NULL) AS pending """) row = cursor.fetchone() return row[0] if row else 0 except Exception as e: logger.error(f"Error counting pending items: {e}") return 0 finally: if conn: conn.close() def _get_progress_breakdown(self) -> Dict[str, Dict[str, int]]: conn = None try: conn = self.db._get_connection() cursor = conn.cursor() progress = {} for entity, table in [('artists', 'artists'), ('albums', 'albums'), ('tracks', 'tracks')]: cursor.execute(f""" SELECT COUNT(*) AS total, SUM(CASE WHEN itunes_match_status IS NOT NULL THEN 1 ELSE 0 END) AS processed FROM {table} """) row = cursor.fetchone() if row: total, processed = row[0], row[1] or 0 progress[entity] = { 'matched': processed, 'total': total, 'percent': int((processed / total * 100) if total > 0 else 0) } return progress except Exception as e: logger.error(f"Error getting progress breakdown: {e}") return {} finally: if conn: conn.close() # ── ID validation ──────────────────────────────────────────────── def _is_itunes_id(self, id_str: str) -> bool: """iTunes IDs are purely numeric. Spotify IDs are alphanumeric (contain letters). Reject alphanumeric IDs to prevent Spotify contamination of itunes_* columns.""" if not id_str: return False return str(id_str).isdigit() # ── Name matching ────────────────────────────────────────────────── def _normalize_name(self, name: str) -> str: name = name.lower().strip() name = re.sub(r'\s+[-–—]\s+.*$', '', name) name = re.sub(r'\s*\(.*?\)\s*', ' ', name) name = re.sub(r'[^\w\s]', '', name) name = re.sub(r'\s+', ' ', name).strip() return name def _name_matches(self, query_name: str, result_name: str) -> bool: norm_query = self._normalize_name(query_name) norm_result = self._normalize_name(result_name) similarity = SequenceMatcher(None, norm_query, norm_result).ratio() logger.debug(f"Name similarity: '{query_name}' vs '{result_name}' = {similarity:.2f}") return similarity >= self.name_similarity_threshold