@ -10,20 +10,20 @@ logger = get_logger("musicbrainz_worker")
class MusicBrainzWorker :
class MusicBrainzWorker :
""" Background worker for enriching library with MusicBrainz IDs """
""" Background worker for enriching library with MusicBrainz IDs """
def __init__ ( self , database : MusicDatabase , app_name : str = " SoulSync " , app_version : str = " 1.0 " , contact_email : str = " " ) :
def __init__ ( self , database : MusicDatabase , app_name : str = " SoulSync " , app_version : str = " 1.0 " , contact_email : str = " " ) :
self . db = database
self . db = database
self . mb_service = MusicBrainzService ( database , app_name , app_version , contact_email )
self . mb_service = MusicBrainzService ( database , app_name , app_version , contact_email )
# Worker state
# Worker state
self . running = False
self . running = False
self . paused = False
self . paused = False
self . should_stop = False
self . should_stop = False
self . thread = None
self . thread = None
# Current item being processed (for UI tooltip)
# Current item being processed (for UI tooltip)
self . current_item = None
self . current_item = None
# Statistics
# Statistics
self . stats = {
self . stats = {
' matched ' : 0 ,
' matched ' : 0 ,
@ -31,67 +31,67 @@ class MusicBrainzWorker:
' pending ' : 0 ,
' pending ' : 0 ,
' errors ' : 0
' errors ' : 0
}
}
# Retry configuration
# Retry configuration
self . retry_days = 30 # Retry 'not_found' items after 30 days
self . retry_days = 30 # Retry 'not_found' items after 30 days
logger . info ( " MusicBrainz background worker initialized " )
logger . info ( " MusicBrainz background worker initialized " )
def start ( self ) :
def start ( self ) :
""" Start the background worker """
""" Start the background worker """
if self . running :
if self . running :
logger . warning ( " Worker already running " )
logger . warning ( " Worker already running " )
return
return
self . running = True
self . running = True
self . should_stop = False
self . should_stop = False
self . thread = threading . Thread ( target = self . _run , daemon = True )
self . thread = threading . Thread ( target = self . _run , daemon = True )
self . thread . start ( )
self . thread . start ( )
logger . info ( " MusicBrainz background worker started " )
logger . info ( " MusicBrainz background worker started " )
def stop ( self ) :
def stop ( self ) :
""" Stop the background worker """
""" Stop the background worker """
if not self . running :
if not self . running :
return
return
logger . info ( " Stopping MusicBrainz worker... " )
logger . info ( " Stopping MusicBrainz worker... " )
self . should_stop = True
self . should_stop = True
self . running = False
self . running = False
if self . thread :
if self . thread :
self . thread . join ( timeout = 5 )
self . thread . join ( timeout = 5 )
logger . info ( " Music Brainz worker stopped " )
logger . info ( " Music Brainz worker stopped " )
def pause ( self ) :
def pause ( self ) :
""" Pause the worker """
""" Pause the worker """
if not self . running :
if not self . running :
logger . warning ( " Worker not running, cannot pause " )
logger . warning ( " Worker not running, cannot pause " )
return
return
self . paused = True
self . paused = True
logger . info ( " MusicBrainz worker paused " )
logger . info ( " MusicBrainz worker paused " )
def resume ( self ) :
def resume ( self ) :
""" Resume the worker """
""" Resume the worker """
if not self . running :
if not self . running :
logger . warning ( " Worker not running, start it first " )
logger . warning ( " Worker not running, start it first " )
return
return
self . paused = False
self . paused = False
logger . info ( " MusicBrainz worker resumed " )
logger . info ( " MusicBrainz worker resumed " )
def get_stats ( self ) - > Dict [ str , Any ] :
def get_stats ( self ) - > Dict [ str , Any ] :
""" Get current statistics """
""" Get current statistics """
# Update pending count
# Update pending count
self . stats [ ' pending ' ] = self . _count_pending_items ( )
self . stats [ ' pending ' ] = self . _count_pending_items ( )
# Get progress breakdown by entity type
# Get progress breakdown by entity type
progress = self . _get_progress_breakdown ( )
progress = self . _get_progress_breakdown ( )
# Check if thread is actually alive (in case it crashed)
# Check if thread is actually alive (in case it crashed)
is_actually_running = self . running and ( self . thread is not None and self . thread . is_alive ( ) )
is_actually_running = self . running and ( self . thread is not None and self . thread . is_alive ( ) )
return {
return {
' enabled ' : True ,
' enabled ' : True ,
' running ' : is_actually_running and not self . paused ,
' running ' : is_actually_running and not self . paused ,
@ -100,53 +100,53 @@ class MusicBrainzWorker:
' stats ' : self . stats . copy ( ) ,
' stats ' : self . stats . copy ( ) ,
' progress ' : progress
' progress ' : progress
}
}
def _run ( self ) :
def _run ( self ) :
""" Main worker loop """
""" Main worker loop """
logger . info ( " MusicBrainz worker thread started " )
logger . info ( " MusicBrainz worker thread started " )
while not self . should_stop :
while not self . should_stop :
try :
try :
# Check if paused
# Check if paused
if self . paused :
if self . paused :
time . sleep ( 1 )
time . sleep ( 1 )
continue
continue
# Clear previous item before getting next
# Clear previous item before getting next
self . current_item = None
self . current_item = None
# Get next item to process
# Get next item to process
item = self . _get_next_item ( )
item = self . _get_next_item ( )
if not item :
if not item :
# No more items - sleep for a bit
# No more items - sleep for a bit
logger . debug ( " No pending items, sleeping... " )
logger . debug ( " No pending items, sleeping... " )
time . sleep ( 10 )
time . sleep ( 10 )
continue
continue
# Set current item for UI tracking
# Set current item for UI tracking
self . current_item = item
self . current_item = item
# Process the item
# Process the item
self . _process_item ( item )
self . _process_item ( item )
# Keep current_item set during sleep so UI can see what was just processed
# Keep current_item set during sleep so UI can see what was just processed
# Rate limit: 1 request per second
# Rate limit: 1 request per second
time . sleep ( 1 )
time . sleep ( 1 )
except Exception as e :
except Exception as e :
logger . error ( f " Error in worker loop: { e } " )
logger . error ( f " Error in worker loop: { e } " )
time . sleep ( 5 ) # Back off on errors
time . sleep ( 5 ) # Back off on errors
logger . info ( " MusicBrainz worker thread finished " )
logger . info ( " MusicBrainz worker thread finished " )
def _get_next_item ( self ) - > Optional [ Dict [ str , Any ] ] :
def _get_next_item ( self ) - > Optional [ Dict [ str , Any ] ] :
""" Get next item to process from priority queue """
""" Get next item to process from priority queue """
conn = None
conn = None
try :
try :
conn = self . db . _get_connection ( )
conn = self . db . _get_connection ( )
cursor = conn . cursor ( )
cursor = conn . cursor ( )
# Priority 1: Unattempted artists
# Priority 1: Unattempted artists
cursor . execute ( """
cursor . execute ( """
SELECT id , name
SELECT id , name
@ -158,7 +158,7 @@ class MusicBrainzWorker:
row = cursor . fetchone ( )
row = cursor . fetchone ( )
if row :
if row :
return { ' type ' : ' artist ' , ' id ' : row [ 0 ] , ' name ' : row [ 1 ] }
return { ' type ' : ' artist ' , ' id ' : row [ 0 ] , ' name ' : row [ 1 ] }
# Priority 2: Unattempted albums
# Priority 2: Unattempted albums
cursor . execute ( """
cursor . execute ( """
SELECT a . id , a . title , ar . name AS artist_name
SELECT a . id , a . title , ar . name AS artist_name
@ -171,7 +171,7 @@ class MusicBrainzWorker:
row = cursor . fetchone ( )
row = cursor . fetchone ( )
if row :
if row :
return { ' type ' : ' album ' , ' id ' : row [ 0 ] , ' name ' : row [ 1 ] , ' artist ' : row [ 2 ] }
return { ' type ' : ' album ' , ' id ' : row [ 0 ] , ' name ' : row [ 1 ] , ' artist ' : row [ 2 ] }
# Priority 3: Unattempted tracks
# Priority 3: Unattempted tracks
cursor . execute ( """
cursor . execute ( """
SELECT t . id , t . title , ar . name AS artist_name
SELECT t . id , t . title , ar . name AS artist_name
@ -184,7 +184,7 @@ class MusicBrainzWorker:
row = cursor . fetchone ( )
row = cursor . fetchone ( )
if row :
if row :
return { ' type ' : ' track ' , ' id ' : row [ 0 ] , ' name ' : row [ 1 ] , ' artist ' : row [ 2 ] }
return { ' type ' : ' track ' , ' id ' : row [ 0 ] , ' name ' : row [ 1 ] , ' artist ' : row [ 2 ] }
# Priority 4: Retry 'not_found' artists after retry_days
# Priority 4: Retry 'not_found' artists after retry_days
cutoff_date = datetime . now ( ) - timedelta ( days = self . retry_days )
cutoff_date = datetime . now ( ) - timedelta ( days = self . retry_days )
cursor . execute ( """
cursor . execute ( """
@ -199,7 +199,7 @@ class MusicBrainzWorker:
if row :
if row :
logger . info ( f " Retrying artist ' { row [ 1 ] } ' (last attempted: { cutoff_date } ) " )
logger . info ( f " Retrying artist ' { row [ 1 ] } ' (last attempted: { cutoff_date } ) " )
return { ' type ' : ' artist ' , ' id ' : row [ 0 ] , ' name ' : row [ 1 ] }
return { ' type ' : ' artist ' , ' id ' : row [ 0 ] , ' name ' : row [ 1 ] }
# Priority 5: Retry 'not_found' albums
# Priority 5: Retry 'not_found' albums
cursor . execute ( """
cursor . execute ( """
SELECT a . id , a . title , ar . name AS artist_name
SELECT a . id , a . title , ar . name AS artist_name
@ -213,7 +213,7 @@ class MusicBrainzWorker:
row = cursor . fetchone ( )
row = cursor . fetchone ( )
if row :
if row :
return { ' type ' : ' album ' , ' id ' : row [ 0 ] , ' name ' : row [ 1 ] , ' artist ' : row [ 2 ] }
return { ' type ' : ' album ' , ' id ' : row [ 0 ] , ' name ' : row [ 1 ] , ' artist ' : row [ 2 ] }
# Priority 6: Retry 'not_found' tracks
# Priority 6: Retry 'not_found' tracks
cursor . execute ( """
cursor . execute ( """
SELECT t . id , t . title , ar . name AS artist_name
SELECT t . id , t . title , ar . name AS artist_name
@ -227,25 +227,25 @@ class MusicBrainzWorker:
row = cursor . fetchone ( )
row = cursor . fetchone ( )
if row :
if row :
return { ' type ' : ' track ' , ' id ' : row [ 0 ] , ' name ' : row [ 1 ] , ' artist ' : row [ 2 ] }
return { ' type ' : ' track ' , ' id ' : row [ 0 ] , ' name ' : row [ 1 ] , ' artist ' : row [ 2 ] }
return None
return None
except Exception as e :
except Exception as e :
logger . error ( f " Error getting next item: { e } " )
logger . error ( f " Error getting next item: { e } " )
return None
return None
finally :
finally :
if conn :
if conn :
conn . close ( )
conn . close ( )
def _process_item ( self , item : Dict [ str , Any ] ) :
def _process_item ( self , item : Dict [ str , Any ] ) :
""" Process a single item (artist, album, or track) """
""" Process a single item (artist, album, or track) """
try :
try :
item_type = item [ ' type ' ]
item_type = item [ ' type ' ]
item_id = item [ ' id ' ]
item_id = item [ ' id ' ]
item_name = item [ ' name ' ]
item_name = item [ ' name ' ]
logger . debug ( f " Processing { item_type } # { item_id } : { item_name } " )
logger . debug ( f " Processing { item_type } # { item_id } : { item_name } " )
if item_type == ' artist ' :
if item_type == ' artist ' :
result = self . mb_service . match_artist ( item_name )
result = self . mb_service . match_artist ( item_name )
if result and result . get ( ' mbid ' ) :
if result and result . get ( ' mbid ' ) :
@ -256,7 +256,7 @@ class MusicBrainzWorker:
self . mb_service . update_artist_mbid ( item_id , None , ' not_found ' )
self . mb_service . update_artist_mbid ( item_id , None , ' not_found ' )
self . stats [ ' not_found ' ] + = 1
self . stats [ ' not_found ' ] + = 1
logger . debug ( f " ❌ No match for artist ' { item_name } ' " )
logger . debug ( f " ❌ No match for artist ' { item_name } ' " )
elif item_type == ' album ' :
elif item_type == ' album ' :
artist_name = item . get ( ' artist ' )
artist_name = item . get ( ' artist ' )
result = self . mb_service . match_release ( item_name , artist_name )
result = self . mb_service . match_release ( item_name , artist_name )
@ -268,7 +268,7 @@ class MusicBrainzWorker:
self . mb_service . update_album_mbid ( item_id , None , ' not_found ' )
self . mb_service . update_album_mbid ( item_id , None , ' not_found ' )
self . stats [ ' not_found ' ] + = 1
self . stats [ ' not_found ' ] + = 1
logger . debug ( f " ❌ No match for album ' { item_name } ' " )
logger . debug ( f " ❌ No match for album ' { item_name } ' " )
elif item_type == ' track ' :
elif item_type == ' track ' :
artist_name = item . get ( ' artist ' )
artist_name = item . get ( ' artist ' )
result = self . mb_service . match_recording ( item_name , artist_name )
result = self . mb_service . match_recording ( item_name , artist_name )
@ -280,11 +280,11 @@ class MusicBrainzWorker:
self . mb_service . update_track_mbid ( item_id , None , ' not_found ' )
self . mb_service . update_track_mbid ( item_id , None , ' not_found ' )
self . stats [ ' not_found ' ] + = 1
self . stats [ ' not_found ' ] + = 1
logger . debug ( f " ❌ No match for track ' { item_name } ' " )
logger . debug ( f " ❌ No match for track ' { item_name } ' " )
except Exception as e :
except Exception as e :
logger . error ( f " Error processing { item [ ' type ' ] } # { item [ ' id ' ] } : { e } " )
logger . error ( f " Error processing { item [ ' type ' ] } # { item [ ' id ' ] } : { e } " )
self . stats [ ' errors ' ] + = 1
self . stats [ ' errors ' ] + = 1
# Mark as error in database
# Mark as error in database
try :
try :
if item [ ' type ' ] == ' artist ' :
if item [ ' type ' ] == ' artist ' :
@ -295,46 +295,46 @@ class MusicBrainzWorker:
self . mb_service . update_track_mbid ( item [ ' id ' ] , None , ' error ' )
self . mb_service . update_track_mbid ( item [ ' id ' ] , None , ' error ' )
except Exception as e2 :
except Exception as e2 :
logger . error ( f " Error updating item status: { e2 } " )
logger . error ( f " Error updating item status: { e2 } " )
def _count_pending_items ( self ) - > int :
def _count_pending_items ( self ) - > int :
""" Count how many items still need processing """
""" Count how many items still need processing """
conn = None
conn = None
try :
try :
conn = self . db . _get_connection ( )
conn = self . db . _get_connection ( )
cursor = conn . cursor ( )
cursor = conn . cursor ( )
# Count unattempted items
# Count unattempted items
cursor . execute ( """
cursor . execute ( """
SELECT
SELECT
( SELECT COUNT ( * ) FROM artists WHERE musicbrainz_match_status IS NULL ) +
( SELECT COUNT ( * ) FROM artists WHERE musicbrainz_match_status IS NULL ) +
( SELECT COUNT ( * ) FROM albums WHERE musicbrainz_match_status IS NULL ) +
( SELECT COUNT ( * ) FROM albums WHERE musicbrainz_match_status IS NULL ) +
( SELECT COUNT ( * ) FROM tracks WHERE musicbrainz_match_status IS NULL )
( SELECT COUNT ( * ) FROM tracks WHERE musicbrainz_match_status IS NULL )
AS pending
AS pending
""" )
""" )
row = cursor . fetchone ( )
row = cursor . fetchone ( )
return row [ 0 ] if row else 0
return row [ 0 ] if row else 0
except Exception as e :
except Exception as e :
logger . error ( f " Error counting pending items: { e } " )
logger . error ( f " Error counting pending items: { e } " )
return 0
return 0
finally :
finally :
if conn :
if conn :
conn . close ( )
conn . close ( )
def _get_progress_breakdown ( self ) - > Dict [ str , Dict [ str , int ] ] :
def _get_progress_breakdown ( self ) - > Dict [ str , Dict [ str , int ] ] :
""" Get progress breakdown by entity type """
""" Get progress breakdown by entity type """
conn = None
conn = None
try :
try :
conn = self . db . _get_connection ( )
conn = self . db . _get_connection ( )
cursor = conn . cursor ( )
cursor = conn . cursor ( )
progress = { }
progress = { }
# Artists progress
# Artists progress
cursor . execute ( """
cursor . execute ( """
SELECT
SELECT
COUNT ( * ) AS total ,
COUNT ( * ) AS total ,
SUM ( CASE WHEN musicbrainz_match_status IS NOT NULL THEN 1 ELSE 0 END ) AS processed
SUM ( CASE WHEN musicbrainz_match_status IS NOT NULL THEN 1 ELSE 0 END ) AS processed
FROM artists
FROM artists
@ -347,10 +347,10 @@ class MusicBrainzWorker:
' total ' : total ,
' total ' : total ,
' percent ' : int ( ( processed / total * 100 ) if total > 0 else 0 )
' percent ' : int ( ( processed / total * 100 ) if total > 0 else 0 )
}
}
# Albums progress
# Albums progress
cursor . execute ( """
cursor . execute ( """
SELECT
SELECT
COUNT ( * ) AS total ,
COUNT ( * ) AS total ,
SUM ( CASE WHEN musicbrainz_match_status IS NOT NULL THEN 1 ELSE 0 END ) AS processed
SUM ( CASE WHEN musicbrainz_match_status IS NOT NULL THEN 1 ELSE 0 END ) AS processed
FROM albums
FROM albums
@ -363,10 +363,10 @@ class MusicBrainzWorker:
' total ' : total ,
' total ' : total ,
' percent ' : int ( ( processed / total * 100 ) if total > 0 else 0 )
' percent ' : int ( ( processed / total * 100 ) if total > 0 else 0 )
}
}
# Tracks progress
# Tracks progress
cursor . execute ( """
cursor . execute ( """
SELECT
SELECT
COUNT ( * ) AS total ,
COUNT ( * ) AS total ,
SUM ( CASE WHEN musicbrainz_match_status IS NOT NULL THEN 1 ELSE 0 END ) AS processed
SUM ( CASE WHEN musicbrainz_match_status IS NOT NULL THEN 1 ELSE 0 END ) AS processed
FROM tracks
FROM tracks
@ -379,9 +379,9 @@ class MusicBrainzWorker:
' total ' : total ,
' total ' : total ,
' percent ' : int ( ( processed / total * 100 ) if total > 0 else 0 )
' percent ' : int ( ( processed / total * 100 ) if total > 0 else 0 )
}
}
return progress
return progress
except Exception as e :
except Exception as e :
logger . error ( f " Error getting progress breakdown: { e } " )
logger . error ( f " Error getting progress breakdown: { e } " )
return { }
return { }