Phase 3b: per-listener stream sessions (no more shared-playback collision)

Wires the StreamStateStore (Phase 3a) into the live routes so each browser/
device gets its OWN playback instead of every client sharing one global
stream_state. Fixes the long-standing limit where a second tab/device/listener
would hijack the one playback.

- _stream_session_id(): stable per-browser id stored in the Flask session
  cookie (falls back to DEFAULT when no request context — e.g. the socket
  broadcast thread — so single-user behavior is identical).
- _current_stream_state(): the StreamSession for the calling browser.
- Routes rewired to the caller's session + its own lock: /api/library/play,
  /api/stream/start, /api/stream/status, /stream/audio, /api/stream/stop.
- Background tasks tracked per session (stream_tasks[sid]) instead of one
  global Future, so stopping/replacing one listener's stream doesn't cancel
  another's. Executor bumped 1 -> 4 workers so concurrent listeners don't
  queue behind each other.
- Per-session staging: named sessions stage under Stream/<sid>/Stream so
  listeners never clear each other's files; default session keeps flat Stream/.
- /stream/status now returns THIS listener's state, which is what the frontend
  polls — so per-listener works without touching the socket broadcast (left
  serving the default session, now vestigial for status).

Isolation invariant covered by tests/streaming/test_stream_state_store.py
(distinct sessions independent, default stable). The route-level cookie wiring
+ actual two-client no-collision behavior need live multi-client verification
(can't be tested without booting Flask + separate cookies) — EXPERIMENTAL,
needs Boulder to test with 2 browsers/devices. 33 streaming tests still pass;
web_server parses.
pull/761/head
BoulderBadgeDad 3 weeks ago
parent 866f2e4a23
commit f617458962

@ -755,11 +755,40 @@ logger.info("Core service initialization complete.")
# s.update) so the ~20 existing call sites work unchanged. ``stream_lock`` is
# that session's own lock, so ``with stream_lock:`` guards exactly what it did.
from core.streaming.state import StreamStateStore as _StreamStateStore
from core.streaming.state import DEFAULT_SESSION as _DEFAULT_STREAM_SESSION
stream_state_store = _StreamStateStore()
stream_state = stream_state_store.get() # DEFAULT_SESSION
stream_state = stream_state_store.get() # DEFAULT_SESSION (back-compat alias)
stream_lock = stream_state.lock
stream_background_task = None
stream_executor = ThreadPoolExecutor(max_workers=1) # Only one stream at a time
# Phase 3b — per-listener playback: each browser/device gets its own stream
# session so two listeners no longer collide on one global. Background tasks are
# tracked per session id. max_workers bumped so concurrent listeners don't queue
# behind each other. Single-user behavior is unchanged (one cookie → one session).
stream_background_task = None # legacy alias (default session)
stream_tasks = {} # session_id -> Future
stream_executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="StreamPrep")
def _stream_session_id():
"""Stable per-browser stream session id, from the Flask session cookie.
Falls back to the DEFAULT session when there's no request context or no
cookie yet (e.g. the 1s socket broadcast thread) so single-user behavior
is identical to before. Each distinct browser/device gets its own id and
therefore its own independent playback + Stream/<id> staging dir.
"""
try:
sid = session.get('stream_sid')
if not sid:
sid = uuid.uuid4().hex[:16]
session['stream_sid'] = sid
return sid
except Exception:
return _DEFAULT_STREAM_SESSION
def _current_stream_state():
"""The StreamSession for the calling browser (dict-compatible)."""
return stream_state_store.get(_stream_session_id())
# Global OAuth State Management
# Store PKCE values for Tidal OAuth flow
@ -1782,25 +1811,36 @@ _EDITION_BARE_RE = _re.compile(
from core.streaming import prepare as _streaming_prepare
def _build_prepare_stream_deps():
"""Build the PrepareStreamDeps bundle from web_server.py globals on each call."""
def _build_prepare_stream_deps(sess, sid):
"""Build the PrepareStreamDeps bundle for a specific stream session.
``sess`` is the StreamSession (dict-compatible) for this listener; ``sid`` is
its id, used to give each session its own ``Stream/<sid>`` staging subdir so
concurrent listeners don't clear each other's files.
"""
def _get_stream_state():
return stream_state
return sess
def _set_stream_state(value):
# prepare.py only ever mutates in place (.update / [k]=), so this is
# effectively dead — but if anything DOES reassign, route it through
# the session's replace() so the store's default session stays the live
# object instead of being detached by a global rebind.
if value is stream_state:
# prepare.py only mutates in place (.update / [k]=) so this is
# effectively dead — but if anything reassigns, route it through the
# session's replace() so the store keeps the live object.
if value is sess:
return
stream_state.replace(dict(value))
sess.replace(dict(value))
base_root = os.path.dirname(os.path.abspath(__file__))
# prepare.py stages into <project_root>/Stream. Default session keeps the
# historical flat Stream/; a named session stages under Stream/<sid>/Stream so
# concurrent listeners never clear each other's files. (The served file_path
# is absolute, so staging location only affects isolation/cleanup.)
project_root = base_root if sid == _DEFAULT_STREAM_SESSION else os.path.join(base_root, 'Stream', sid)
return _streaming_prepare.PrepareStreamDeps(
config_manager=config_manager,
download_orchestrator=download_orchestrator,
stream_lock=stream_lock,
project_root=os.path.dirname(os.path.abspath(__file__)),
stream_lock=sess.lock,
project_root=project_root,
docker_resolve_path=docker_resolve_path,
find_streaming_download_in_all_downloads=_find_streaming_download_in_all_downloads,
find_downloaded_file=_find_downloaded_file,
@ -1811,8 +1851,8 @@ def _build_prepare_stream_deps():
)
def _prepare_stream_task(track_data):
return _streaming_prepare.prepare_stream_task(track_data, _build_prepare_stream_deps())
def _prepare_stream_task(track_data, sess, sid):
return _streaming_prepare.prepare_stream_task(track_data, _build_prepare_stream_deps(sess, sid))
def _find_streaming_download_in_all_downloads(all_downloads, track_data):
@ -10588,9 +10628,10 @@ def library_play_track():
logger.info(f"Library play request: {os.path.basename(file_path)}")
# Set stream state to ready with the library file path directly
with stream_lock:
stream_state.update({
# Set THIS listener's stream state to ready with the library file path.
sess = _current_stream_state()
with sess.lock:
sess.update({
"status": "ready",
"progress": 100,
"track_info": {
@ -11745,23 +11786,24 @@ def library_radio():
@app.route('/api/stream/start', methods=['POST'])
def stream_start():
"""Start streaming a track in the background"""
global stream_background_task
"""Start streaming a track in the background (per-listener session)."""
data = request.get_json()
if not data:
return jsonify({"success": False, "error": "No track data provided"}), 400
logger.info(f"Web UI Stream request for: {data.get('filename')}")
try:
# Stop any existing streaming task
if stream_background_task and not stream_background_task.done():
stream_background_task.cancel()
# Reset stream state
with stream_lock:
stream_state.update({
sid = _stream_session_id()
sess = stream_state_store.get(sid)
# Stop only THIS listener's existing task — others keep playing.
prev = stream_tasks.get(sid)
if prev and not prev.done():
prev.cancel()
with sess.lock:
sess.update({
"status": "stopped",
"progress": 0,
"track_info": None,
@ -11769,27 +11811,31 @@ def stream_start():
"error_message": None,
"is_library": False
})
# Start new background streaming task
stream_background_task = stream_executor.submit(_prepare_stream_task, data)
# Start new background streaming task for this session.
fut = stream_executor.submit(_prepare_stream_task, data, sess, sid)
stream_tasks[sid] = fut
if sid == _DEFAULT_STREAM_SESSION:
global stream_background_task
stream_background_task = fut # keep legacy alias in sync
return jsonify({"success": True, "message": "Streaming started"})
except Exception as e:
logger.error(f"Error starting stream: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/stream/status')
def stream_status():
"""Get current streaming status and progress"""
"""Get current streaming status and progress for THIS listener."""
try:
with stream_lock:
# Return copy of current stream state
sess = _current_stream_state()
with sess.lock:
return jsonify({
"status": stream_state["status"],
"progress": stream_state["progress"],
"track_info": stream_state["track_info"],
"error_message": stream_state["error_message"]
"status": sess["status"],
"progress": sess["progress"],
"track_info": sess["track_info"],
"error_message": sess["error_message"]
})
except Exception as e:
logger.error(f"Error getting stream status: {e}")
@ -11869,12 +11915,13 @@ def _serve_audio_file_with_range(file_path):
@app.route('/stream/audio')
def stream_audio():
"""Serve the audio file from the Stream folder with range request support"""
"""Serve THIS listener's current audio file with range request support."""
try:
with stream_lock:
if stream_state["status"] != "ready" or not stream_state["file_path"]:
sess = _current_stream_state()
with sess.lock:
if sess["status"] != "ready" or not sess["file_path"]:
return jsonify({"error": "No audio file ready for streaming"}), 404
file_path = stream_state["file_path"]
file_path = sess["file_path"]
logger.info(f"Serving audio file: {os.path.basename(file_path)}")
return _serve_audio_file_with_range(file_path)
@ -11907,21 +11954,25 @@ def stream_library_audio():
@app.route('/api/stream/stop', methods=['POST'])
def stream_stop():
"""Stop streaming and clean up"""
global stream_background_task
"""Stop THIS listener's stream and clean up its staging dir."""
try:
# Cancel background task
if stream_background_task and not stream_background_task.done():
stream_background_task.cancel()
sid = _stream_session_id()
sess = stream_state_store.get(sid)
# Cancel only this session's background task.
fut = stream_tasks.get(sid)
if fut and not fut.done():
fut.cancel()
# Only clear Stream folder if NOT playing a library file
with stream_lock:
is_library = stream_state.get("is_library", False)
# Only clear the (per-session) Stream folder if NOT a library file.
with sess.lock:
is_library = sess.get("is_library", False)
if not is_library:
project_root = os.path.dirname(os.path.abspath(__file__))
stream_folder = os.path.join(project_root, 'Stream')
base_root = os.path.dirname(os.path.abspath(__file__))
stream_folder = (os.path.join(base_root, 'Stream')
if sid == _DEFAULT_STREAM_SESSION
else os.path.join(base_root, 'Stream', sid, 'Stream'))
if os.path.exists(stream_folder):
for filename in os.listdir(stream_folder):
@ -11932,9 +11983,9 @@ def stream_stop():
else:
logger.info("Library playback stopped - skipping file deletion")
# Reset stream state
with stream_lock:
stream_state.update({
# Reset this session's stream state
with sess.lock:
sess.update({
"status": "stopped",
"progress": 0,
"track_info": None,

Loading…
Cancel
Save