From 2c19d7d1f2862572b4cb615fe19a2381ef94200e Mon Sep 17 00:00:00 2001 From: Broque Thomas <26755000+Nezreka@users.noreply.github.com> Date: Tue, 5 May 2026 11:56:09 -0700 Subject: [PATCH] Per-source lock sharding on the engine MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per JohnBaumb: the single state_lock serialized progress callbacks across every source. Pre-refactor each client owned its own download lock, so Deezer / YouTube / Tidal workers never blocked each other. Multi-source concurrent downloads under the unified lock fought for the same RLock on every progress update. Replaced the engine-wide state_lock with per-source RLocks. Each source gets its own lock, lazily created via _source_lock() on first use (meta-lock guards the create-race). All record mutations (add/update/update_unless_state/remove/get/iter) take only that source's lock — Deezer progress updates no longer block Tidal writes. Cancelled-preserve semantics still hold because cancel + worker terminal write target the same source, so they share that source's lock. New test pins lock independence: holding source-A's lock from one thread does not block a write on source-B from another. --- core/download_engine/engine.py | 62 +++++++++++++++++-------- core/download_engine/worker.py | 2 +- tests/downloads/test_download_engine.py | 45 ++++++++++++++++++ 3 files changed, 88 insertions(+), 21 deletions(-) diff --git a/core/download_engine/engine.py b/core/download_engine/engine.py index ed5df608..a1391111 100644 --- a/core/download_engine/engine.py +++ b/core/download_engine/engine.py @@ -2,8 +2,8 @@ Phase B scope: skeleton only. The engine exposes a place for plugins to register, a single ``active_downloads`` dict keyed by -``(source, download_id)``, and a ``state_lock`` that guards mutations -across the multi-threaded download worker pool. +``(source, download_id)``, and per-source RLocks that guard mutations +without serializing workers across different sources. Subsequent phases bolt more capability on top: - ``dispatch_download(plugin, target_id)`` (Phase C — replaces every @@ -50,22 +50,31 @@ class DownloadEngine: so the engine can answer "which plugin owns this download" in O(1) without iterating every plugin). - Thread safety: every state mutation goes through ``state_lock``. - Read-only accessors (``get_record``, ``iter_records_for_source``) - take the lock briefly and return a SHALLOW COPY so the caller - can iterate without holding the lock. Callers that need to - mutate a record should use ``update_record`` which takes the - lock and applies the patch atomically. + Thread safety: per-source lock sharding. Each source gets its own + RLock — progress callbacks on Deezer don't block Tidal's worker + and vice versa, matching the pre-refactor behavior where each + client owned its own download lock. Read-only accessors + (``get_record``, ``iter_records_for_source``) take the source's + lock briefly and return a SHALLOW COPY so the caller can iterate + without holding the lock. Callers that need to mutate a record + should use ``update_record`` which takes the lock and applies the + patch atomically. """ def __init__(self) -> None: - self.state_lock = threading.RLock() # Nested dict: source_name → {download_id → record}. Replaces # the original single-dict composite-key layout so # ``iter_records_for_source`` is O(source_records) instead of - # O(total_records). RLock so a plugin's worker callback can - # re-enter while holding the lock for its own update. + # O(total_records). self._records: Dict[str, Dict[str, DownloadRecord]] = {} + # Per-source RLocks. Each source gets its own so progress + # updates on one source never block writes on another. RLock + # so a plugin's worker callback can re-enter while holding the + # lock for its own update. Lazily created via ``_source_lock``; + # the meta-lock guards creation against the create-race window + # where two threads could both miss + both create. + self._source_locks: Dict[str, threading.RLock] = {} + self._source_locks_lock = threading.Lock() # Plugins that have registered with the engine. Source name # → plugin instance. self._plugins: Dict[str, Any] = {} @@ -155,6 +164,18 @@ class DownloadEngine: def registered_sources(self) -> List[str]: return list(self._plugins.keys()) + def _source_lock(self, source_name: str) -> threading.RLock: + """Return the per-source RLock, lazy-creating it on first use. + The meta-lock around the cache lookup closes the create-race + window where two threads both miss + both create a fresh lock. + """ + with self._source_locks_lock: + lock = self._source_locks.get(source_name) + if lock is None: + lock = threading.RLock() + self._source_locks[source_name] = lock + return lock + # ------------------------------------------------------------------ # Active-downloads state — Phase B core surface # ------------------------------------------------------------------ @@ -163,7 +184,7 @@ class DownloadEngine: """Insert a fresh download record. Used by clients (today directly via their own dicts; Phase B2 routes them through here).""" - with self.state_lock: + with self._source_lock(source_name): source_bucket = self._records.setdefault(source_name, {}) if download_id in source_bucket: logger.warning("Replacing existing download record for %s/%s", source_name, download_id) @@ -172,7 +193,7 @@ class DownloadEngine: def update_record(self, source_name: str, download_id: str, patch: DownloadRecord) -> None: """Apply a partial patch to an existing record. No-op if the record was already removed (e.g. cancelled mid-update).""" - with self.state_lock: + with self._source_lock(source_name): existing = self._records.get(source_name, {}).get(download_id) if existing is None: return @@ -189,10 +210,10 @@ class DownloadEngine: Used by the background download worker's ``_mark_terminal`` to avoid the read-then-write race Cin flagged: a cancel landing between the snapshot and update could be overwritten - back to Errored / Completed. Holding ``state_lock`` across + back to Errored / Completed. Holding the source's lock across the check + write closes the window. """ - with self.state_lock: + with self._source_lock(source_name): existing = self._records.get(source_name, {}).get(download_id) if existing is None: return False @@ -204,7 +225,7 @@ class DownloadEngine: def remove_record(self, source_name: str, download_id: str) -> Optional[DownloadRecord]: """Delete a record (cancellation cleanup). Returns the removed record or None if not found.""" - with self.state_lock: + with self._source_lock(source_name): source_bucket = self._records.get(source_name) if not source_bucket: return None @@ -218,20 +239,21 @@ class DownloadEngine: def get_record(self, source_name: str, download_id: str) -> Optional[DownloadRecord]: """Return a SHALLOW COPY of the record. Caller mutations don't affect engine state — use ``update_record`` for that.""" - with self.state_lock: + with self._source_lock(source_name): record = self._records.get(source_name, {}).get(download_id) return dict(record) if record is not None else None def iter_records_for_source(self, source_name: str) -> Iterator[DownloadRecord]: """Yield SHALLOW COPIES of every record owned by a source. - Holds the lock briefly to snapshot, then yields outside the - lock so callers can spend arbitrary time on each record. + Holds the source's lock briefly to snapshot, then yields + outside the lock so callers can spend arbitrary time on each + record. With the nested-dict layout this is O(source_records) — only touches the bucket for the requested source, not every record across every source. """ - with self.state_lock: + with self._source_lock(source_name): source_bucket = self._records.get(source_name, {}) snapshot = [dict(record) for record in source_bucket.values()] for record in snapshot: diff --git a/core/download_engine/worker.py b/core/download_engine/worker.py index 77503e14..54f36757 100644 --- a/core/download_engine/worker.py +++ b/core/download_engine/worker.py @@ -301,7 +301,7 @@ class BackgroundDownloadWorker: client used to hand-roll inside its thread worker. Uses ``update_record_unless_state`` so the check + write are - atomic under the engine's state_lock. Cin caught a race + atomic under the engine's per-source lock. Cin caught a race where a cancel landing between the read-snapshot + write could overwrite Cancelled back to Errored / Completed. """ diff --git a/tests/downloads/test_download_engine.py b/tests/downloads/test_download_engine.py index aa597fa8..e4e9cff0 100644 --- a/tests/downloads/test_download_engine.py +++ b/tests/downloads/test_download_engine.py @@ -120,6 +120,51 @@ def test_remove_record_returns_none_when_missing(): assert engine.remove_record('qobuz', 'never-existed') is None +def test_per_source_locks_dont_block_each_other(): + """Per JohnBaumb: each source must have its own lock so a long-held + write on one source doesn't block writes on another. Pre-refactor + each client owned its own download lock; the engine has to match + that semantic. + + Hold source-A's lock from one thread, then mutate source-B from + another thread. Source-B's mutation must complete promptly even + while source-A is locked. + """ + engine = DownloadEngine() + engine.add_record('youtube', 'yt-1', {'state': 'InProgress'}) + engine.add_record('tidal', 'td-1', {'state': 'InProgress'}) + + held = threading.Event() + release = threading.Event() + other_done = threading.Event() + + def hold_youtube_lock(): + with engine._source_lock('youtube'): + held.set() + release.wait(timeout=2.0) + + def update_tidal_while_youtube_held(): + held.wait(timeout=1.0) + engine.update_record('tidal', 'td-1', {'state': 'Completed, Succeeded'}) + other_done.set() + + holder = threading.Thread(target=hold_youtube_lock) + other = threading.Thread(target=update_tidal_while_youtube_held) + holder.start() + other.start() + + # Tidal write must complete even though YouTube's lock is held. + assert other_done.wait(timeout=1.0), ( + "Tidal mutation blocked by YouTube's lock — sources are not " + "independently shardable" + ) + + release.set() + holder.join() + other.join() + assert engine.get_record('tidal', 'td-1')['state'] == 'Completed, Succeeded' + + def test_remove_record_drops_empty_source_bucket(): """Per JohnBaumb: nested layout makes per-source iteration O(source_records). Removing the last record for a source must