@ -755,11 +755,40 @@ logger.info("Core service initialization complete.")
# s.update) so the ~20 existing call sites work unchanged. ``stream_lock`` is
# that session's own lock, so ``with stream_lock:`` guards exactly what it did.
from core . streaming . state import StreamStateStore as _StreamStateStore
from core . streaming . state import DEFAULT_SESSION as _DEFAULT_STREAM_SESSION
stream_state_store = _StreamStateStore ( )
stream_state = stream_state_store . get ( ) # DEFAULT_SESSION
stream_state = stream_state_store . get ( ) # DEFAULT_SESSION (back-compat alias)
stream_lock = stream_state . lock
stream_background_task = None
stream_executor = ThreadPoolExecutor ( max_workers = 1 ) # Only one stream at a time
# Phase 3b — per-listener playback: each browser/device gets its own stream
# session so two listeners no longer collide on one global. Background tasks are
# tracked per session id. max_workers bumped so concurrent listeners don't queue
# behind each other. Single-user behavior is unchanged (one cookie → one session).
stream_background_task = None # legacy alias (default session)
stream_tasks = { } # session_id -> Future
stream_executor = ThreadPoolExecutor ( max_workers = 4 , thread_name_prefix = " StreamPrep " )
def _stream_session_id ( ) :
""" Stable per-browser stream session id, from the Flask session cookie.
Falls back to the DEFAULT session when there ' s no request context or no
cookie yet ( e . g . the 1 s socket broadcast thread ) — so single - user behavior
is identical to before . Each distinct browser / device gets its own id and
therefore its own independent playback + Stream / < id > staging dir .
"""
try :
sid = session . get ( ' stream_sid ' )
if not sid :
sid = uuid . uuid4 ( ) . hex [ : 16 ]
session [ ' stream_sid ' ] = sid
return sid
except Exception :
return _DEFAULT_STREAM_SESSION
def _current_stream_state ( ) :
""" The StreamSession for the calling browser (dict-compatible). """
return stream_state_store . get ( _stream_session_id ( ) )
# Global OAuth State Management
# Store PKCE values for Tidal OAuth flow
@ -1782,25 +1811,36 @@ _EDITION_BARE_RE = _re.compile(
from core . streaming import prepare as _streaming_prepare
def _build_prepare_stream_deps ( ) :
""" Build the PrepareStreamDeps bundle from web_server.py globals on each call. """
def _build_prepare_stream_deps ( sess , sid ) :
""" Build the PrepareStreamDeps bundle for a specific stream session.
` ` sess ` ` is the StreamSession ( dict - compatible ) for this listener ; ` ` sid ` ` is
its id , used to give each session its own ` ` Stream / < sid > ` ` staging subdir so
concurrent listeners don ' t clear each other ' s files .
"""
def _get_stream_state ( ) :
return stream_state
return s ess
def _set_stream_state ( value ) :
# prepare.py only ever mutates in place (.update / [k]=), so this is
# effectively dead — but if anything DOES reassign, route it through
# the session's replace() so the store's default session stays the live
# object instead of being detached by a global rebind.
if value is stream_state :
# prepare.py only mutates in place (.update / [k]=) so this is
# effectively dead — but if anything reassigns, route it through the
# session's replace() so the store keeps the live object.
if value is sess :
return
stream_state . replace ( dict ( value ) )
sess . replace ( dict ( value ) )
base_root = os . path . dirname ( os . path . abspath ( __file__ ) )
# prepare.py stages into <project_root>/Stream. Default session keeps the
# historical flat Stream/; a named session stages under Stream/<sid>/Stream so
# concurrent listeners never clear each other's files. (The served file_path
# is absolute, so staging location only affects isolation/cleanup.)
project_root = base_root if sid == _DEFAULT_STREAM_SESSION else os . path . join ( base_root , ' Stream ' , sid )
return _streaming_prepare . PrepareStreamDeps (
config_manager = config_manager ,
download_orchestrator = download_orchestrator ,
stream_lock = stream_lock ,
project_root = os . path . dirname ( os . path . abspath ( __file__ ) ) ,
stream_lock = s ess. lock,
project_root = project_root ,
docker_resolve_path = docker_resolve_path ,
find_streaming_download_in_all_downloads = _find_streaming_download_in_all_downloads ,
find_downloaded_file = _find_downloaded_file ,
@ -1811,8 +1851,8 @@ def _build_prepare_stream_deps():
)
def _prepare_stream_task ( track_data ):
return _streaming_prepare . prepare_stream_task ( track_data , _build_prepare_stream_deps ( ) )
def _prepare_stream_task ( track_data , sess , sid ):
return _streaming_prepare . prepare_stream_task ( track_data , _build_prepare_stream_deps ( sess , sid ) )
def _find_streaming_download_in_all_downloads ( all_downloads , track_data ) :
@ -10588,9 +10628,10 @@ def library_play_track():
logger . info ( f " Library play request: { os . path . basename ( file_path ) } " )
# Set stream state to ready with the library file path directly
with stream_lock :
stream_state . update ( {
# Set THIS listener's stream state to ready with the library file path.
sess = _current_stream_state ( )
with sess . lock :
sess . update ( {
" status " : " ready " ,
" progress " : 100 ,
" track_info " : {
@ -11745,23 +11786,24 @@ def library_radio():
@app.route ( ' /api/stream/start ' , methods = [ ' POST ' ] )
def stream_start ( ) :
""" Start streaming a track in the background """
global stream_background_task
""" Start streaming a track in the background (per-listener session). """
data = request . get_json ( )
if not data :
return jsonify ( { " success " : False , " error " : " No track data provided " } ) , 400
logger . info ( f " Web UI Stream request for: { data . get ( ' filename ' ) } " )
try :
# Stop any existing streaming task
if stream_background_task and not stream_background_task . done ( ) :
stream_background_task . cancel ( )
# Reset stream state
with stream_lock :
stream_state . update ( {
sid = _stream_session_id ( )
sess = stream_state_store . get ( sid )
# Stop only THIS listener's existing task — others keep playing.
prev = stream_tasks . get ( sid )
if prev and not prev . done ( ) :
prev . cancel ( )
with sess . lock :
sess . update ( {
" status " : " stopped " ,
" progress " : 0 ,
" track_info " : None ,
@ -11769,27 +11811,31 @@ def stream_start():
" error_message " : None ,
" is_library " : False
} )
# Start new background streaming task
stream_background_task = stream_executor . submit ( _prepare_stream_task , data )
# Start new background streaming task for this session.
fut = stream_executor . submit ( _prepare_stream_task , data , sess , sid )
stream_tasks [ sid ] = fut
if sid == _DEFAULT_STREAM_SESSION :
global stream_background_task
stream_background_task = fut # keep legacy alias in sync
return jsonify ( { " success " : True , " message " : " Streaming started " } )
except Exception as e :
logger . error ( f " Error starting stream: { e } " )
return jsonify ( { " success " : False , " error " : str ( e ) } ) , 500
@app.route ( ' /api/stream/status ' )
def stream_status ( ) :
""" Get current streaming status and progress """
""" Get current streaming status and progress for THIS listener. """
try :
with stream_lock :
# Return copy of current stream state
sess = _current_stream_state ( )
with sess . lock :
return jsonify ( {
" status " : s tream_state [ " status " ] ,
" progress " : s tream_state [ " progress " ] ,
" track_info " : s tream_state [ " track_info " ] ,
" error_message " : s tream_state [ " error_message " ]
" status " : s ess [ " status " ] ,
" progress " : s ess [ " progress " ] ,
" track_info " : s ess [ " track_info " ] ,
" error_message " : s ess [ " error_message " ]
} )
except Exception as e :
logger . error ( f " Error getting stream status: { e } " )
@ -11869,12 +11915,13 @@ def _serve_audio_file_with_range(file_path):
@app.route ( ' /stream/audio ' )
def stream_audio ( ) :
""" Serve th e audio file from the Stream folder with range request support"""
""" Serve THIS lis tener' s current audio file with range request support. """
try :
with stream_lock :
if stream_state [ " status " ] != " ready " or not stream_state [ " file_path " ] :
sess = _current_stream_state ( )
with sess . lock :
if sess [ " status " ] != " ready " or not sess [ " file_path " ] :
return jsonify ( { " error " : " No audio file ready for streaming " } ) , 404
file_path = s tream_state [ " file_path " ]
file_path = s ess [ " file_path " ]
logger . info ( f " Serving audio file: { os . path . basename ( file_path ) } " )
return _serve_audio_file_with_range ( file_path )
@ -11907,21 +11954,25 @@ def stream_library_audio():
@app.route ( ' /api/stream/stop ' , methods = [ ' POST ' ] )
def stream_stop ( ) :
""" Stop streaming and clean up """
global stream_background_task
""" Stop THIS listener ' s stream and clean up its staging dir. """
try :
# Cancel background task
if stream_background_task and not stream_background_task . done ( ) :
stream_background_task . cancel ( )
sid = _stream_session_id ( )
sess = stream_state_store . get ( sid )
# Cancel only this session's background task.
fut = stream_tasks . get ( sid )
if fut and not fut . done ( ) :
fut . cancel ( )
# Only clear Stream folder if NOT playing a library file
with stream_lock :
is_library = stream_state . get ( " is_library " , False )
# Only clear the (per-session) Stream folder if NOT a library file.
with s ess. lock:
is_library = s ess . get ( " is_library " , False )
if not is_library :
project_root = os . path . dirname ( os . path . abspath ( __file__ ) )
stream_folder = os . path . join ( project_root , ' Stream ' )
base_root = os . path . dirname ( os . path . abspath ( __file__ ) )
stream_folder = ( os . path . join ( base_root , ' Stream ' )
if sid == _DEFAULT_STREAM_SESSION
else os . path . join ( base_root , ' Stream ' , sid , ' Stream ' ) )
if os . path . exists ( stream_folder ) :
for filename in os . listdir ( stream_folder ) :
@ -11932,9 +11983,9 @@ def stream_stop():
else :
logger . info ( " Library playback stopped - skipping file deletion " )
# Reset stream state
with s tream_ lock:
s tream_state . update ( {
# Reset this session's stream state
with s ess. lock:
s ess . update ( {
" status " : " stopped " ,
" progress " : 0 ,
" track_info " : None ,