diff --git a/web_server.py b/web_server.py index 7cc6c11e..51b160e6 100644 --- a/web_server.py +++ b/web_server.py @@ -755,11 +755,40 @@ logger.info("Core service initialization complete.") # s.update) so the ~20 existing call sites work unchanged. ``stream_lock`` is # that session's own lock, so ``with stream_lock:`` guards exactly what it did. from core.streaming.state import StreamStateStore as _StreamStateStore +from core.streaming.state import DEFAULT_SESSION as _DEFAULT_STREAM_SESSION stream_state_store = _StreamStateStore() -stream_state = stream_state_store.get() # DEFAULT_SESSION +stream_state = stream_state_store.get() # DEFAULT_SESSION (back-compat alias) stream_lock = stream_state.lock -stream_background_task = None -stream_executor = ThreadPoolExecutor(max_workers=1) # Only one stream at a time +# Phase 3b — per-listener playback: each browser/device gets its own stream +# session so two listeners no longer collide on one global. Background tasks are +# tracked per session id. max_workers bumped so concurrent listeners don't queue +# behind each other. Single-user behavior is unchanged (one cookie → one session). +stream_background_task = None # legacy alias (default session) +stream_tasks = {} # session_id -> Future +stream_executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="StreamPrep") + + +def _stream_session_id(): + """Stable per-browser stream session id, from the Flask session cookie. + + Falls back to the DEFAULT session when there's no request context or no + cookie yet (e.g. the 1s socket broadcast thread) — so single-user behavior + is identical to before. Each distinct browser/device gets its own id and + therefore its own independent playback + Stream/ staging dir. + """ + try: + sid = session.get('stream_sid') + if not sid: + sid = uuid.uuid4().hex[:16] + session['stream_sid'] = sid + return sid + except Exception: + return _DEFAULT_STREAM_SESSION + + +def _current_stream_state(): + """The StreamSession for the calling browser (dict-compatible).""" + return stream_state_store.get(_stream_session_id()) # Global OAuth State Management # Store PKCE values for Tidal OAuth flow @@ -1782,25 +1811,36 @@ _EDITION_BARE_RE = _re.compile( from core.streaming import prepare as _streaming_prepare -def _build_prepare_stream_deps(): - """Build the PrepareStreamDeps bundle from web_server.py globals on each call.""" +def _build_prepare_stream_deps(sess, sid): + """Build the PrepareStreamDeps bundle for a specific stream session. + + ``sess`` is the StreamSession (dict-compatible) for this listener; ``sid`` is + its id, used to give each session its own ``Stream/`` staging subdir so + concurrent listeners don't clear each other's files. + """ def _get_stream_state(): - return stream_state + return sess def _set_stream_state(value): - # prepare.py only ever mutates in place (.update / [k]=), so this is - # effectively dead — but if anything DOES reassign, route it through - # the session's replace() so the store's default session stays the live - # object instead of being detached by a global rebind. - if value is stream_state: + # prepare.py only mutates in place (.update / [k]=) so this is + # effectively dead — but if anything reassigns, route it through the + # session's replace() so the store keeps the live object. + if value is sess: return - stream_state.replace(dict(value)) + sess.replace(dict(value)) + + base_root = os.path.dirname(os.path.abspath(__file__)) + # prepare.py stages into /Stream. Default session keeps the + # historical flat Stream/; a named session stages under Stream//Stream so + # concurrent listeners never clear each other's files. (The served file_path + # is absolute, so staging location only affects isolation/cleanup.) + project_root = base_root if sid == _DEFAULT_STREAM_SESSION else os.path.join(base_root, 'Stream', sid) return _streaming_prepare.PrepareStreamDeps( config_manager=config_manager, download_orchestrator=download_orchestrator, - stream_lock=stream_lock, - project_root=os.path.dirname(os.path.abspath(__file__)), + stream_lock=sess.lock, + project_root=project_root, docker_resolve_path=docker_resolve_path, find_streaming_download_in_all_downloads=_find_streaming_download_in_all_downloads, find_downloaded_file=_find_downloaded_file, @@ -1811,8 +1851,8 @@ def _build_prepare_stream_deps(): ) -def _prepare_stream_task(track_data): - return _streaming_prepare.prepare_stream_task(track_data, _build_prepare_stream_deps()) +def _prepare_stream_task(track_data, sess, sid): + return _streaming_prepare.prepare_stream_task(track_data, _build_prepare_stream_deps(sess, sid)) def _find_streaming_download_in_all_downloads(all_downloads, track_data): @@ -10588,9 +10628,10 @@ def library_play_track(): logger.info(f"Library play request: {os.path.basename(file_path)}") - # Set stream state to ready with the library file path directly - with stream_lock: - stream_state.update({ + # Set THIS listener's stream state to ready with the library file path. + sess = _current_stream_state() + with sess.lock: + sess.update({ "status": "ready", "progress": 100, "track_info": { @@ -11745,23 +11786,24 @@ def library_radio(): @app.route('/api/stream/start', methods=['POST']) def stream_start(): - """Start streaming a track in the background""" - global stream_background_task - + """Start streaming a track in the background (per-listener session).""" data = request.get_json() if not data: return jsonify({"success": False, "error": "No track data provided"}), 400 - + logger.info(f"Web UI Stream request for: {data.get('filename')}") - + try: - # Stop any existing streaming task - if stream_background_task and not stream_background_task.done(): - stream_background_task.cancel() - - # Reset stream state - with stream_lock: - stream_state.update({ + sid = _stream_session_id() + sess = stream_state_store.get(sid) + + # Stop only THIS listener's existing task — others keep playing. + prev = stream_tasks.get(sid) + if prev and not prev.done(): + prev.cancel() + + with sess.lock: + sess.update({ "status": "stopped", "progress": 0, "track_info": None, @@ -11769,27 +11811,31 @@ def stream_start(): "error_message": None, "is_library": False }) - - # Start new background streaming task - stream_background_task = stream_executor.submit(_prepare_stream_task, data) - + + # Start new background streaming task for this session. + fut = stream_executor.submit(_prepare_stream_task, data, sess, sid) + stream_tasks[sid] = fut + if sid == _DEFAULT_STREAM_SESSION: + global stream_background_task + stream_background_task = fut # keep legacy alias in sync + return jsonify({"success": True, "message": "Streaming started"}) - + except Exception as e: logger.error(f"Error starting stream: {e}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/api/stream/status') def stream_status(): - """Get current streaming status and progress""" + """Get current streaming status and progress for THIS listener.""" try: - with stream_lock: - # Return copy of current stream state + sess = _current_stream_state() + with sess.lock: return jsonify({ - "status": stream_state["status"], - "progress": stream_state["progress"], - "track_info": stream_state["track_info"], - "error_message": stream_state["error_message"] + "status": sess["status"], + "progress": sess["progress"], + "track_info": sess["track_info"], + "error_message": sess["error_message"] }) except Exception as e: logger.error(f"Error getting stream status: {e}") @@ -11869,12 +11915,13 @@ def _serve_audio_file_with_range(file_path): @app.route('/stream/audio') def stream_audio(): - """Serve the audio file from the Stream folder with range request support""" + """Serve THIS listener's current audio file with range request support.""" try: - with stream_lock: - if stream_state["status"] != "ready" or not stream_state["file_path"]: + sess = _current_stream_state() + with sess.lock: + if sess["status"] != "ready" or not sess["file_path"]: return jsonify({"error": "No audio file ready for streaming"}), 404 - file_path = stream_state["file_path"] + file_path = sess["file_path"] logger.info(f"Serving audio file: {os.path.basename(file_path)}") return _serve_audio_file_with_range(file_path) @@ -11907,21 +11954,25 @@ def stream_library_audio(): @app.route('/api/stream/stop', methods=['POST']) def stream_stop(): - """Stop streaming and clean up""" - global stream_background_task - + """Stop THIS listener's stream and clean up its staging dir.""" try: - # Cancel background task - if stream_background_task and not stream_background_task.done(): - stream_background_task.cancel() + sid = _stream_session_id() + sess = stream_state_store.get(sid) + + # Cancel only this session's background task. + fut = stream_tasks.get(sid) + if fut and not fut.done(): + fut.cancel() - # Only clear Stream folder if NOT playing a library file - with stream_lock: - is_library = stream_state.get("is_library", False) + # Only clear the (per-session) Stream folder if NOT a library file. + with sess.lock: + is_library = sess.get("is_library", False) if not is_library: - project_root = os.path.dirname(os.path.abspath(__file__)) - stream_folder = os.path.join(project_root, 'Stream') + base_root = os.path.dirname(os.path.abspath(__file__)) + stream_folder = (os.path.join(base_root, 'Stream') + if sid == _DEFAULT_STREAM_SESSION + else os.path.join(base_root, 'Stream', sid, 'Stream')) if os.path.exists(stream_folder): for filename in os.listdir(stream_folder): @@ -11932,9 +11983,9 @@ def stream_stop(): else: logger.info("Library playback stopped - skipping file deletion") - # Reset stream state - with stream_lock: - stream_state.update({ + # Reset this session's stream state + with sess.lock: + sess.update({ "status": "stopped", "progress": 0, "track_info": None,