@ -6,6 +6,7 @@ enabling MusicBrainz as a search tab in enhanced and global search.
Album art is fetched from Cover Art Archive ( free , linked by release MBID ) .
"""
import threading
import requests
from dataclasses import dataclass
from typing import Any , Dict , List , Optional
@ -121,6 +122,15 @@ class MusicBrainzSearchClient:
# the exact UI minor version would add noise to every request.
self . _client = MusicBrainzClient ( " SoulSync " , " 2 " )
self . _art_cache : Dict [ str , Optional [ str ] ] = { } # mbid -> url
# Per-instance cache for "top artist MBID for this query". The
# backend fires artists/albums/tracks searches in parallel against
# one client instance, and albums+tracks both need the same artist
# lookup. Without this cache, we'd fire 3 identical artist-search
# HTTP calls (each serialized by the 1-rps rate limit = 3 wasted
# seconds). The _Sentinel marks "we already looked and found
# nothing" to prevent repeat no-hit lookups.
self . _artist_mbid_cache : Dict [ str , Optional [ Dict [ str , Any ] ] ] = { }
self . _artist_mbid_lock = threading . Lock ( )
def _cached_art ( self , release_mbid : str , release_group_mbid : str = ' ' ) - > Optional [ str ] :
""" Get cover art with caching. Tries release first, then release group. """
@ -185,31 +195,114 @@ class MusicBrainzSearchClient:
logger . warning ( f " MusicBrainz artist search failed: { e } " )
return [ ]
def _split_structured_query ( self , query : str ) :
""" Split ' Artist - Title ' / ' Artist – Title ' / ' Artist — Title ' if
a separator is present . Returns ( artist_name , title ) or ( None , query ) . """
for sep in [ ' - ' , ' – ' , ' — ' ] :
if sep in query :
parts = query . split ( sep , 1 )
return parts [ 0 ] . strip ( ) , parts [ 1 ] . strip ( )
return None , query
def _resolve_top_artist ( self , query : str ) - > Optional [ Dict [ str , Any ] ] :
""" Return the top-scoring artist for a bare-name query, or None if
nothing scores above threshold . Cached per instance so parallel
album / track searches don ' t each refetch. " " "
if not query :
return None
key = query . strip ( ) . lower ( )
with self . _artist_mbid_lock :
if key in self . _artist_mbid_cache :
return self . _artist_mbid_cache [ key ]
# Do the HTTP call OUTSIDE the lock so other threads can still
# check the cache while we wait on the network.
raw = self . _client . search_artist ( query , limit = 1 , strict = False )
top = None
if raw and ( raw [ 0 ] . get ( ' score ' , 0 ) or 0 ) > = self . _MIN_SCORE :
top = raw [ 0 ]
with self . _artist_mbid_lock :
self . _artist_mbid_cache [ key ] = top
return top
def _release_group_to_album ( self , rg : Dict [ str , Any ] , artist_name : str ) - > Album :
""" Project a MusicBrainz release-group into our Album dataclass. """
rg_mbid = rg . get ( ' id ' , ' ' )
title = rg . get ( ' title ' , ' ' ) or ' '
primary_type = rg . get ( ' primary-type ' , ' ' ) or ' '
secondary_types = rg . get ( ' secondary-types ' , [ ] ) or [ ]
album_type = _map_release_type ( primary_type , secondary_types )
release_date = rg . get ( ' first-release-date ' , ' ' ) or ' '
# Release-group browse doesn't link directly to a single release,
# so we can't get per-release track counts cheaply. Leave 0 — the
# frontend treats it as "unknown" gracefully.
image_url = self . _cached_art ( rg_mbid , rg_mbid )
return Album (
id = rg_mbid ,
name = title ,
artists = [ artist_name ] if artist_name else [ ' Unknown Artist ' ] ,
release_date = release_date ,
total_tracks = 0 ,
album_type = album_type ,
image_url = image_url ,
external_urls = { ' musicbrainz ' : f ' https://musicbrainz.org/release-group/ { rg_mbid } ' } if rg_mbid else { } ,
)
def search_albums ( self , query : str , limit : int = 10 ) - > List [ Album ] :
""" Search MusicBrainz for releases (albums). """
""" Search MusicBrainz for releases (albums).
Primary path : when the query looks like a bare artist name , resolve
it to an artist MBID and BROWSE that artist ' s release-groups. This
returns the artist ' s actual discography instead of unrelated
releases that happen to be titled after them .
Fallback path : when the query is structured as " Artist - Album " or
the artist lookup fails , drop back to text search with the
existing Lucene strategy .
"""
try :
# Try to split "Artist Album" for better matching
artist_name = None
album_name = query
for sep in [ ' - ' , ' – ' , ' — ' ] :
if sep in query :
parts = query . split ( sep , 1 )
artist_name = parts [ 0 ] . strip ( )
album_name = parts [ 1 ] . strip ( )
break
artist_name , title = self . _split_structured_query ( query )
# Structured "Artist - Album" query → respect user's intent;
# text-search with both terms is more precise than browsing all
# of that artist's discography.
if artist_name :
return self . _search_albums_text ( title , artist_name , limit )
# Bare name query → try artist-first → browse path.
top = self . _resolve_top_artist ( query )
if top :
mbid = top . get ( ' id ' , ' ' )
tname = top . get ( ' name ' , ' ' ) or query
rgs = self . _client . browse_artist_release_groups (
mbid ,
release_types = [ ' album ' , ' ep ' , ' single ' , ' compilation ' ] ,
limit = 100 ,
)
# Sort by first-release-date desc (newest first), then by
# primary-type priority (album > ep > single > compilation)
# so the top of the list is a credible "what to explore."
type_priority = { ' album ' : 0 , ' ep ' : 1 , ' single ' : 2 , ' compilation ' : 3 }
def _sort_key ( rg ) :
pt = ( rg . get ( ' primary-type ' ) or ' ' ) . lower ( )
date = rg . get ( ' first-release-date ' ) or ' '
return ( type_priority . get ( pt , 9 ) , - int ( date [ : 4 ] ) if date [ : 4 ] . isdigit ( ) else 0 )
rgs . sort ( key = _sort_key )
albums = [ self . _release_group_to_album ( rg , tname ) for rg in rgs [ : limit ] ]
return albums
# No artist match → text search on the whole query.
return self . _search_albums_text ( query , None , limit )
except Exception as e :
logger . warning ( f " MusicBrainz album search failed: { e } " )
return [ ]
def _search_albums_text ( self , album_name : str , artist_name : Optional [ str ] , limit : int ) - > List [ Album ] :
""" Fallback text-search path for structured/fuzzy album queries. """
try :
results = self . _client . search_release ( album_name , artist_name = artist_name , limit = limit )
# If no separator, try word-boundary splitting
if not results and not artist_name :
words = query . split ( )
for i in range ( 1 , len ( words ) ) :
possible_artist = ' ' . join ( words [ : i ] )
possible_album = ' ' . join ( words [ i : ] )
if len ( possible_album ) > = 2 :
results = self . _client . search_release ( possible_album , artist_name = possible_artist , limit = limit )
if results :
break
# Score filter — same threshold as artists. Drops garbage
# title-match hits from unrelated releases.
results = [ r for r in results if ( r . get ( ' score ' , 0 ) or 0 ) > = self . _MIN_SCORE ]
albums = [ ]
for r in results :
@ -274,96 +367,128 @@ class MusicBrainzSearchClient:
logger . warning ( f " MusicBrainz album search failed: { e } " )
return [ ]
def _recording_to_track ( self , r : Dict [ str , Any ] , fallback_artist_name : str ) - > Optional [ Track ] :
""" Project a MusicBrainz recording into our Track dataclass. Returns
None when the recording lacks required fields . """
mbid = r . get ( ' id ' , ' ' )
title = r . get ( ' title ' , ' ' )
if not title :
return None
artists = _extract_artist_credit ( r . get ( ' artist-credit ' , [ ] ) )
if not artists and fallback_artist_name :
artists = [ fallback_artist_name ]
duration_ms = r . get ( ' length ' , 0 ) or 0
album_name = ' '
album_id = ' '
release_date = ' '
image_url = None
album_type = ' single '
total_tracks = 1
releases = r . get ( ' releases ' , [ ] ) or [ ]
if releases :
rel = releases [ 0 ]
album_name = rel . get ( ' title ' , ' ' ) or ' '
album_id = rel . get ( ' id ' , ' ' ) or ' '
release_date = rel . get ( ' date ' , ' ' ) or ' '
rg = rel . get ( ' release-group ' , { } ) or { }
primary_type = rg . get ( ' primary-type ' , ' ' ) or ' '
secondary_types = rg . get ( ' secondary-types ' , [ ] ) or [ ]
album_type = _map_release_type ( primary_type , secondary_types )
for m in rel . get ( ' media ' , [ ] ) or [ ] :
total_tracks + = m . get ( ' track-count ' , 0 )
rg_mbid = rg . get ( ' id ' , ' ' ) or ' '
image_url = self . _cached_art ( album_id , rg_mbid ) if album_id else None
return Track (
id = mbid ,
name = title ,
artists = artists if artists else [ ' Unknown Artist ' ] ,
album = album_name or title ,
duration_ms = duration_ms ,
popularity = r . get ( ' score ' , 0 ) or 0 ,
image_url = image_url ,
release_date = release_date ,
external_urls = { ' musicbrainz ' : f ' https://musicbrainz.org/recording/ { mbid } ' } if mbid else { } ,
album_type = album_type ,
total_tracks = total_tracks ,
album_id = album_id ,
)
def search_tracks ( self , query : str , limit : int = 10 ) - > List [ Track ] :
""" Search MusicBrainz for recordings (tracks). """
""" Search MusicBrainz for recordings (tracks).
Same strategy as ` search_albums ` : bare name → artist - first → browse
recordings ; structured " Artist - Title " stays on text search so the
user ' s explicit title intent is respected.
"""
try :
# Try to split "Artist - Title" for better matching
artist_name = None
track_name = query
for sep in [ ' - ' , ' – ' , ' — ' ] :
if sep in query :
parts = query . split ( sep , 1 )
artist_name = parts [ 0 ] . strip ( )
track_name = parts [ 1 ] . strip ( )
break
artist_name , title = self . _split_structured_query ( query )
# Structured query → text search with both fields.
if artist_name :
return self . _search_tracks_text ( title , artist_name , limit )
# Bare name → artist-first → browse.
top = self . _resolve_top_artist ( query )
if top :
mbid = top . get ( ' id ' , ' ' )
tname = top . get ( ' name ' , ' ' ) or query
recs = self . _client . browse_artist_recordings (
mbid ,
limit = 100 ,
includes = [ ' releases ' , ' artist-credits ' ] ,
)
# Browse returns recordings unsorted. Dedupe by normalized
# title (MB has many live/compilation variants of the same
# song), then sort by release date desc so "newest" tracks
# surface first — matches how the other source tabs look.
seen = set ( )
deduped = [ ]
for r in recs :
key = ( r . get ( ' title ' ) or ' ' ) . lower ( ) . strip ( )
if not key or key in seen :
continue
seen . add ( key )
deduped . append ( r )
def _track_sort_key ( r ) :
rel = ( r . get ( ' releases ' ) or [ { } ] ) [ 0 ]
date = ( rel . get ( ' date ' ) or ' ' ) [ : 4 ]
return - int ( date ) if date . isdigit ( ) else 0
deduped . sort ( key = _track_sort_key )
tracks = [ ]
for r in deduped [ : limit ] :
t = self . _recording_to_track ( r , tname )
if t :
tracks . append ( t )
return tracks
# No artist match → fall back to text search on whole query.
return self . _search_tracks_text ( query , None , limit )
except Exception as e :
logger . warning ( f " MusicBrainz track search failed: { e } " )
return [ ]
def _search_tracks_text ( self , track_name : str , artist_name : Optional [ str ] , limit : int ) - > List [ Track ] :
""" Fallback text-search path for structured/fuzzy track queries. """
try :
results = self . _client . search_recording ( track_name , artist_name = artist_name , limit = limit )
# Score filter matches the artist/album logic — cuts garbage
# title collisions from unrelated recordings.
results = [ r for r in results if ( r . get ( ' score ' , 0 ) or 0 ) > = self . _MIN_SCORE ]
# If no separator found or structured search failed, try the full query
# as both a recording search and an artist+recording combined search
if not results and not artist_name :
# Try each word split as potential artist/title boundary
words = query . split ( )
for i in range ( 1 , len ( words ) ) :
possible_artist = ' ' . join ( words [ : i ] )
possible_track = ' ' . join ( words [ i : ] )
if len ( possible_track ) > = 2 :
results = self . _client . search_recording ( possible_track , artist_name = possible_artist , limit = limit )
if results :
break
tracks = [ ]
for r in results :
mbid = r . get ( ' id ' , ' ' )
title = r . get ( ' title ' , ' ' )
if not title :
continue
artists = _extract_artist_credit ( r . get ( ' artist-credit ' , [ ] ) )
duration_ms = r . get ( ' length ' , 0 ) or 0
# Get album from first release
album_name = ' '
album_id = ' '
release_date = ' '
image_url = None
album_type = ' single '
total_tracks = 1
track_number = None
releases = r . get ( ' releases ' , [ ] )
if releases :
rel = releases [ 0 ]
album_name = rel . get ( ' title ' , ' ' )
album_id = rel . get ( ' id ' , ' ' )
release_date = rel . get ( ' date ' , ' ' ) or ' '
rg = rel . get ( ' release-group ' , { } )
primary_type = rg . get ( ' primary-type ' , ' ' ) or ' '
secondary_types = rg . get ( ' secondary-types ' , [ ] ) or [ ]
album_type = _map_release_type ( primary_type , secondary_types )
media = rel . get ( ' media ' , [ ] )
for m in media :
total_tracks + = m . get ( ' track-count ' , 0 )
# Find track number
for t in m . get ( ' tracks ' , [ ] ) :
if t . get ( ' id ' ) == mbid or t . get ( ' recording ' , { } ) . get ( ' id ' ) == mbid :
try :
track_number = int ( t . get ( ' number ' , t . get ( ' position ' , 0 ) ) )
except ( ValueError , TypeError ) :
pass
# Cover art
rg_mbid = rg . get ( ' id ' , ' ' )
image_url = self . _cached_art ( album_id , rg_mbid ) if album_id else None
external_urls = { ' musicbrainz ' : f ' https://musicbrainz.org/recording/ { mbid } ' } if mbid else { }
tracks . append ( Track (
id = mbid ,
name = title ,
artists = artists if artists else [ ' Unknown Artist ' ] ,
album = album_name or title ,
duration_ms = duration_ms ,
popularity = r . get ( ' score ' , 0 ) ,
image_url = image_url ,
release_date = release_date ,
external_urls = external_urls ,
track_number = track_number ,
album_type = album_type ,
total_tracks = total_tracks ,
album_id = album_id ,
) )
t = self . _recording_to_track ( r , artist_name or ' ' )
if t :
tracks . append ( t )
return tracks
except Exception as e :
logger . warning ( f " MusicBrainz track search failed: { e } " )