Per-source lock sharding on the engine

Per JohnBaumb: the single state_lock serialized progress callbacks
across every source. Pre-refactor each client owned its own download
lock, so Deezer / YouTube / Tidal workers never blocked each other.
Multi-source concurrent downloads under the unified lock fought for
the same RLock on every progress update.

Replaced the engine-wide state_lock with per-source RLocks. Each
source gets its own lock, lazily created via _source_lock() on first
use (meta-lock guards the create-race). All record mutations
(add/update/update_unless_state/remove/get/iter) take only that
source's lock — Deezer progress updates no longer block Tidal writes.

Cancelled-preserve semantics still hold because cancel + worker
terminal write target the same source, so they share that source's
lock. New test pins lock independence: holding source-A's lock from
one thread does not block a write on source-B from another.
pull/495/head
Broque Thomas 3 weeks ago
parent a5fde0502a
commit 2c19d7d1f2

@ -2,8 +2,8 @@
Phase B scope: skeleton only. The engine exposes a place for
plugins to register, a single ``active_downloads`` dict keyed by
``(source, download_id)``, and a ``state_lock`` that guards mutations
across the multi-threaded download worker pool.
``(source, download_id)``, and per-source RLocks that guard mutations
without serializing workers across different sources.
Subsequent phases bolt more capability on top:
- ``dispatch_download(plugin, target_id)`` (Phase C replaces every
@ -50,22 +50,31 @@ class DownloadEngine:
so the engine can answer "which plugin owns this download" in
O(1) without iterating every plugin).
Thread safety: every state mutation goes through ``state_lock``.
Read-only accessors (``get_record``, ``iter_records_for_source``)
take the lock briefly and return a SHALLOW COPY so the caller
can iterate without holding the lock. Callers that need to
mutate a record should use ``update_record`` which takes the
lock and applies the patch atomically.
Thread safety: per-source lock sharding. Each source gets its own
RLock progress callbacks on Deezer don't block Tidal's worker
and vice versa, matching the pre-refactor behavior where each
client owned its own download lock. Read-only accessors
(``get_record``, ``iter_records_for_source``) take the source's
lock briefly and return a SHALLOW COPY so the caller can iterate
without holding the lock. Callers that need to mutate a record
should use ``update_record`` which takes the lock and applies the
patch atomically.
"""
def __init__(self) -> None:
self.state_lock = threading.RLock()
# Nested dict: source_name → {download_id → record}. Replaces
# the original single-dict composite-key layout so
# ``iter_records_for_source`` is O(source_records) instead of
# O(total_records). RLock so a plugin's worker callback can
# re-enter while holding the lock for its own update.
# O(total_records).
self._records: Dict[str, Dict[str, DownloadRecord]] = {}
# Per-source RLocks. Each source gets its own so progress
# updates on one source never block writes on another. RLock
# so a plugin's worker callback can re-enter while holding the
# lock for its own update. Lazily created via ``_source_lock``;
# the meta-lock guards creation against the create-race window
# where two threads could both miss + both create.
self._source_locks: Dict[str, threading.RLock] = {}
self._source_locks_lock = threading.Lock()
# Plugins that have registered with the engine. Source name
# → plugin instance.
self._plugins: Dict[str, Any] = {}
@ -155,6 +164,18 @@ class DownloadEngine:
def registered_sources(self) -> List[str]:
return list(self._plugins.keys())
def _source_lock(self, source_name: str) -> threading.RLock:
"""Return the per-source RLock, lazy-creating it on first use.
The meta-lock around the cache lookup closes the create-race
window where two threads both miss + both create a fresh lock.
"""
with self._source_locks_lock:
lock = self._source_locks.get(source_name)
if lock is None:
lock = threading.RLock()
self._source_locks[source_name] = lock
return lock
# ------------------------------------------------------------------
# Active-downloads state — Phase B core surface
# ------------------------------------------------------------------
@ -163,7 +184,7 @@ class DownloadEngine:
"""Insert a fresh download record. Used by clients (today
directly via their own dicts; Phase B2 routes them through
here)."""
with self.state_lock:
with self._source_lock(source_name):
source_bucket = self._records.setdefault(source_name, {})
if download_id in source_bucket:
logger.warning("Replacing existing download record for %s/%s", source_name, download_id)
@ -172,7 +193,7 @@ class DownloadEngine:
def update_record(self, source_name: str, download_id: str, patch: DownloadRecord) -> None:
"""Apply a partial patch to an existing record. No-op if the
record was already removed (e.g. cancelled mid-update)."""
with self.state_lock:
with self._source_lock(source_name):
existing = self._records.get(source_name, {}).get(download_id)
if existing is None:
return
@ -189,10 +210,10 @@ class DownloadEngine:
Used by the background download worker's ``_mark_terminal``
to avoid the read-then-write race Cin flagged: a cancel
landing between the snapshot and update could be overwritten
back to Errored / Completed. Holding ``state_lock`` across
back to Errored / Completed. Holding the source's lock across
the check + write closes the window.
"""
with self.state_lock:
with self._source_lock(source_name):
existing = self._records.get(source_name, {}).get(download_id)
if existing is None:
return False
@ -204,7 +225,7 @@ class DownloadEngine:
def remove_record(self, source_name: str, download_id: str) -> Optional[DownloadRecord]:
"""Delete a record (cancellation cleanup). Returns the
removed record or None if not found."""
with self.state_lock:
with self._source_lock(source_name):
source_bucket = self._records.get(source_name)
if not source_bucket:
return None
@ -218,20 +239,21 @@ class DownloadEngine:
def get_record(self, source_name: str, download_id: str) -> Optional[DownloadRecord]:
"""Return a SHALLOW COPY of the record. Caller mutations
don't affect engine state — use ``update_record`` for that."""
with self.state_lock:
with self._source_lock(source_name):
record = self._records.get(source_name, {}).get(download_id)
return dict(record) if record is not None else None
def iter_records_for_source(self, source_name: str) -> Iterator[DownloadRecord]:
"""Yield SHALLOW COPIES of every record owned by a source.
Holds the lock briefly to snapshot, then yields outside the
lock so callers can spend arbitrary time on each record.
Holds the source's lock briefly to snapshot, then yields
outside the lock so callers can spend arbitrary time on each
record.
With the nested-dict layout this is O(source_records) only
touches the bucket for the requested source, not every record
across every source.
"""
with self.state_lock:
with self._source_lock(source_name):
source_bucket = self._records.get(source_name, {})
snapshot = [dict(record) for record in source_bucket.values()]
for record in snapshot:

@ -301,7 +301,7 @@ class BackgroundDownloadWorker:
client used to hand-roll inside its thread worker.
Uses ``update_record_unless_state`` so the check + write are
atomic under the engine's state_lock. Cin caught a race
atomic under the engine's per-source lock. Cin caught a race
where a cancel landing between the read-snapshot + write
could overwrite Cancelled back to Errored / Completed.
"""

@ -120,6 +120,51 @@ def test_remove_record_returns_none_when_missing():
assert engine.remove_record('qobuz', 'never-existed') is None
def test_per_source_locks_dont_block_each_other():
"""Per JohnBaumb: each source must have its own lock so a long-held
write on one source doesn't block writes on another. Pre-refactor
each client owned its own download lock; the engine has to match
that semantic.
Hold source-A's lock from one thread, then mutate source-B from
another thread. Source-B's mutation must complete promptly even
while source-A is locked.
"""
engine = DownloadEngine()
engine.add_record('youtube', 'yt-1', {'state': 'InProgress'})
engine.add_record('tidal', 'td-1', {'state': 'InProgress'})
held = threading.Event()
release = threading.Event()
other_done = threading.Event()
def hold_youtube_lock():
with engine._source_lock('youtube'):
held.set()
release.wait(timeout=2.0)
def update_tidal_while_youtube_held():
held.wait(timeout=1.0)
engine.update_record('tidal', 'td-1', {'state': 'Completed, Succeeded'})
other_done.set()
holder = threading.Thread(target=hold_youtube_lock)
other = threading.Thread(target=update_tidal_while_youtube_held)
holder.start()
other.start()
# Tidal write must complete even though YouTube's lock is held.
assert other_done.wait(timeout=1.0), (
"Tidal mutation blocked by YouTube's lock — sources are not "
"independently shardable"
)
release.set()
holder.join()
other.join()
assert engine.get_record('tidal', 'td-1')['state'] == 'Completed, Succeeded'
def test_remove_record_drops_empty_source_bucket():
"""Per JohnBaumb: nested layout makes per-source iteration
O(source_records). Removing the last record for a source must

Loading…
Cancel
Save