diff --git a/core/reorganize_queue.py b/core/reorganize_queue.py index fc043933..6b50afb0 100644 --- a/core/reorganize_queue.py +++ b/core/reorganize_queue.py @@ -126,8 +126,13 @@ class ReorganizeQueue: a fake runner; production wires the real one in via :func:`set_runner`. """ - self._lock = threading.Lock() - self._wakeup = threading.Event() + # Single Condition variable owns both mutual exclusion and the + # idle-worker wait. Using a Condition (vs Lock + Event) closes a + # race where the worker could clear an event right after enqueue + # set it, causing the new item to sleep for the timeout window. + # cond.wait() releases the lock and re-acquires on notify, so + # state checks and waits are properly interleaved. + self._cond = threading.Condition() self._items: List[QueueItem] = [] # everything ever submitted (active + recent) self._runner = runner self._worker: Optional[threading.Thread] = None @@ -139,7 +144,7 @@ class ReorganizeQueue: """Inject the function that does the actual reorganize work. Web_server calls this once at startup with a closure over the injected dependencies (post-process fn, db, etc.).""" - with self._lock: + with self._cond: self._runner = runner def enqueue( @@ -161,7 +166,7 @@ class ReorganizeQueue: adding a duplicate. ``cancelled`` / ``done`` / ``failed`` items don't block re-enqueue (user retried after a failure). """ - with self._lock: + with self._cond: for existing in self._items: if existing.album_id == album_id and existing.status in ('queued', 'running'): return { @@ -182,7 +187,7 @@ class ReorganizeQueue: self._items.append(item) position = sum(1 for i in self._items if i.status == 'queued') self._ensure_worker() - self._wakeup.set() + self._cond.notify_all() logger.info( f"[Queue] Enqueued '{album_title}' (album_id={album_id}, " f"queue_id={item.queue_id}, position={position}, source={source or 'auto'})" @@ -199,6 +204,13 @@ class ReorganizeQueue: ``album_title``, ``artist_id``, ``artist_name``, ``source``). Dedupe still applies per-album-id. + Holds the queue lock for the entire batch so two things hold: + (1) the worker can't start draining mid-batch, and (2) duplicate + album_ids inside the same batch get deduped against each other, + not just against pre-existing items. Without (2), a fast runner + could finish the first copy before the loop reached the second + and both would enqueue. + Returns a tally dict ``{'enqueued': N, 'already_queued': M, 'total': len(items)}`` so the caller can report bulk results without doing the counting themselves. Used by the bulk @@ -207,25 +219,44 @@ class ReorganizeQueue: """ enqueued = 0 already = 0 - for item in items: - result = self.enqueue( - album_id=str(item['album_id']), - album_title=item.get('album_title') or 'Unknown Album', - artist_id=str(item['artist_id']) if item.get('artist_id') is not None else None, - artist_name=item.get('artist_name') or 'Unknown Artist', - source=item.get('source'), - ) - if result['queued']: + seen_in_batch: set = set() + with self._cond: + # Snapshot album_ids that already block re-enqueue so we don't + # rescan self._items per row. + blocked = { + i.album_id for i in self._items if i.status in ('queued', 'running') + } + for raw in items: + album_id = str(raw['album_id']) + if album_id in blocked or album_id in seen_in_batch: + already += 1 + continue + seen_in_batch.add(album_id) + item = QueueItem( + queue_id=uuid.uuid4().hex[:12], + album_id=album_id, + album_title=raw.get('album_title') or 'Unknown Album', + artist_id=str(raw['artist_id']) if raw.get('artist_id') is not None else None, + artist_name=raw.get('artist_name') or 'Unknown Artist', + source=raw.get('source'), + enqueued_at=time.time(), + ) + self._items.append(item) enqueued += 1 - elif result.get('reason') == 'already_queued': - already += 1 + logger.info( + f"[Queue] Bulk-enqueued '{item.album_title}' (album_id={album_id}, " + f"queue_id={item.queue_id}, source={item.source or 'auto'})" + ) + if enqueued: + self._ensure_worker() + self._cond.notify_all() return {'enqueued': enqueued, 'already_queued': already, 'total': len(items)} def cancel(self, queue_id: str) -> dict: """Cancel a queued item. The currently-running item cannot be cancelled (Python threads aren't cleanly killable; post-process may have spawned ffmpeg).""" - with self._lock: + with self._cond: for item in self._items: if item.queue_id != queue_id: continue @@ -243,7 +274,7 @@ class ReorganizeQueue: """Cancel ALL queued items (running item continues). Returns the count of items cancelled.""" cancelled = 0 - with self._lock: + with self._cond: now = time.time() for item in self._items: if item.status == 'queued': @@ -264,7 +295,7 @@ class ReorganizeQueue: 'totals': {'queued': N, 'running': M, 'done_today': K, ...}, } """ - with self._lock: + with self._cond: active = next((i for i in self._items if i.status == 'running'), None) queued = [i for i in self._items if i.status == 'queued'] recent = [i for i in self._items if i.status in ('done', 'failed', 'cancelled')] @@ -286,14 +317,15 @@ class ReorganizeQueue: def stop(self) -> None: """Stop the worker (called on server shutdown).""" - self._stopped = True - self._wakeup.set() + with self._cond: + self._stopped = True + self._cond.notify_all() # -- internals --------------------------------------------------- def _ensure_worker(self) -> None: """Lazy worker start — only spawn the thread when there's - actually something to process. Caller MUST hold ``_lock``.""" + actually something to process. Caller MUST hold ``_cond``.""" if self._worker is not None and self._worker.is_alive(): return self._worker = threading.Thread( @@ -301,29 +333,37 @@ class ReorganizeQueue: ) self._worker.start() - def _next_queued(self) -> Optional[QueueItem]: - with self._lock: - for item in self._items: - if item.status == 'queued': - return item + def _claim_next_or_wait(self) -> Optional[QueueItem]: + """Atomically pick the next queued item AND flip it to 'running' + under a single lock acquisition. If the queue is empty, block + on ``_cond.wait()`` (which releases the lock while sleeping) + and return None when we're notified or timeout. Returning the + item already-marked-running closes the cancel-vs-run race: a + cancel() call now sees status='running' and is rejected.""" + with self._cond: + while not self._stopped: + for item in self._items: + if item.status == 'queued': + item.status = 'running' + item.started_at = time.time() + return item + # No queued items — wait for an enqueue or shutdown. + # 60s timeout so a stuck notify (shouldn't happen, but + # defensive) doesn't park the worker forever. + self._cond.wait(timeout=60) return None def _run(self) -> None: """Worker loop: pull next queued, run it, mark done, repeat. - Idles on `_wakeup` event when queue is empty.""" + Idles on `_cond.wait()` when queue is empty.""" logger.info("[Queue] Worker thread started") while not self._stopped: - item = self._next_queued() + item = self._claim_next_or_wait() if item is None: - # Idle — wait for next enqueue (with a long timeout so - # the thread can exit gracefully on shutdown). - self._wakeup.clear() - self._wakeup.wait(timeout=60) + # Only happens on shutdown — `_claim_next_or_wait` only + # returns None once `_stopped` is True. Loop back to the + # `while not self._stopped` check, which exits. continue - - with self._lock: - item.status = 'running' - item.started_at = time.time() logger.info(f"[Queue] Starting '{item.album_title}' (queue_id={item.queue_id})") try: @@ -336,13 +376,13 @@ class ReorganizeQueue: f"[Queue] Runner raised for '{item.album_title}': {e}", exc_info=True, ) - with self._lock: + with self._cond: item.status = 'failed' item.error = str(e) item.finished_at = time.time() continue - with self._lock: + with self._cond: item.moved = int(summary.get('moved', 0)) item.skipped = int(summary.get('skipped', 0)) item.failed = int(summary.get('failed', 0)) @@ -371,7 +411,7 @@ class ReorganizeQueue: # currently-running item. Safe to call from worker thread inside # reorganize_album's on_progress callback. def update_active_progress(self, *, queue_id: str, **fields) -> None: - with self._lock: + with self._cond: for item in self._items: if item.queue_id == queue_id and item.status == 'running': if 'current_track' in fields: diff --git a/database/music_database.py b/database/music_database.py index 7aea3df7..d4f68f0a 100644 --- a/database/music_database.py +++ b/database/music_database.py @@ -4800,34 +4800,31 @@ class MusicDatabase: Used by the reorganize queue enqueue endpoint to capture display strings at submission time so the status panel can render - without a DB lookup per poll. Returns None if the album does - not exist. + without a DB lookup per poll. Returns None when the album row + does not exist; lets DB errors bubble up so callers can surface + a real failure instead of swallowing it as "album not found". """ - try: - with self._get_connection() as conn: - cursor = conn.cursor() - cursor.execute( - """ - SELECT al.title AS album_title, - ar.id AS artist_id, - ar.name AS artist_name - FROM albums al - JOIN artists ar ON al.artist_id = ar.id - WHERE al.id = ? - """, - (str(album_id),), - ) - row = cursor.fetchone() - if not row: - return None - return { - 'album_title': row['album_title'] or 'Unknown Album', - 'artist_id': str(row['artist_id']) if row['artist_id'] is not None else None, - 'artist_name': row['artist_name'] or 'Unknown Artist', - } - except Exception as e: - logger.error(f"Album display-meta fetch failed for {album_id}: {e}") - return None + with self._get_connection() as conn: + cursor = conn.cursor() + cursor.execute( + """ + SELECT al.title AS album_title, + ar.id AS artist_id, + ar.name AS artist_name + FROM albums al + JOIN artists ar ON al.artist_id = ar.id + WHERE al.id = ? + """, + (str(album_id),), + ) + row = cursor.fetchone() + if not row: + return None + return { + 'album_title': row['album_title'] or 'Unknown Album', + 'artist_id': str(row['artist_id']) if row['artist_id'] is not None else None, + 'artist_name': row['artist_name'] or 'Unknown Artist', + } def get_artist_albums_for_reorganize(self, artist_id) -> List[Dict[str, Any]]: """Return ``[{album_id, album_title, artist_id, artist_name}, ...]`` @@ -4835,28 +4832,25 @@ class MusicDatabase: title. Used by the bulk Reorganize-All endpoint to pull the full tracklist server-side instead of trusting whatever the frontend cached. Returns an empty list when the artist has no - albums or when the query errors. + albums; lets DB errors bubble so a real failure surfaces as a + 500 rather than masquerading as "no albums found". """ - try: - with self._get_connection() as conn: - cursor = conn.cursor() - cursor.execute( - """ - SELECT al.id AS album_id, - al.title AS album_title, - ar.id AS artist_id, - ar.name AS artist_name - FROM albums al - JOIN artists ar ON al.artist_id = ar.id - WHERE ar.id = ? - ORDER BY al.year ASC, al.title ASC - """, - (str(artist_id),), - ) - return [dict(r) for r in cursor.fetchall()] - except Exception as e: - logger.error(f"Artist albums (reorganize) fetch failed for {artist_id}: {e}") - return [] + with self._get_connection() as conn: + cursor = conn.cursor() + cursor.execute( + """ + SELECT al.id AS album_id, + al.title AS album_title, + ar.id AS artist_id, + ar.name AS artist_name + FROM albums al + JOIN artists ar ON al.artist_id = ar.id + WHERE ar.id = ? + ORDER BY al.year ASC, al.title ASC + """, + (str(artist_id),), + ) + return [dict(r) for r in cursor.fetchall()] def get_albums_by_artist(self, artist_id: int) -> List[DatabaseAlbum]: """Get all albums by artist ID""" diff --git a/tests/test_reorganize_db_methods.py b/tests/test_reorganize_db_methods.py index 26e7518e..27e4609d 100644 --- a/tests/test_reorganize_db_methods.py +++ b/tests/test_reorganize_db_methods.py @@ -189,3 +189,25 @@ def test_get_artist_albums_for_reorganize_isolates_by_artist(db): ]) rows = db.get_artist_albums_for_reorganize('ar-1') assert {r['album_id'] for r in rows} == {'alb-1', 'alb-3'} + + +# ── error propagation ──────────────────────────────────────────────────── +# Regression for review feedback on the original PR: helpers used to +# swallow every Exception and return None / [], so a real DB outage +# masqueraded as "album not found" / "no albums". Now they let the +# error bubble — the route layer turns it into a 500 — so the user sees +# a real failure instead of a phantom empty state. + + +def test_get_album_display_meta_propagates_db_errors(db): + """If the underlying tables don't exist, the helper must raise + rather than swallow it as a missing-album result.""" + # Don't seed — the schema is empty, so the SELECT will fail with + # OperationalError ("no such table: albums"). + with pytest.raises(sqlite3.OperationalError): + db.get_album_display_meta('alb-1') + + +def test_get_artist_albums_for_reorganize_propagates_db_errors(db): + with pytest.raises(sqlite3.OperationalError): + db.get_artist_albums_for_reorganize('ar-1') diff --git a/tests/test_reorganize_queue.py b/tests/test_reorganize_queue.py index eae32659..8e2707b4 100644 --- a/tests/test_reorganize_queue.py +++ b/tests/test_reorganize_queue.py @@ -399,6 +399,71 @@ def test_enqueue_many_handles_empty_list(queue): assert queue.enqueue_many([]) == {'enqueued': 0, 'already_queued': 0, 'total': 0} +def test_enqueue_many_dedupes_batch_internal_duplicates(queue): + """Same album_id appearing twice in the same bulk request must be + deduped against each other — not just against pre-existing items. + Regression for the race where a fast runner finishes the first copy + before the loop reaches the second, letting both slip through.""" + record = [] + queue.set_runner(_make_runner(record)) + items = [ + {'album_id': 'alb-x', 'album_title': 'X', 'artist_id': 'ar-1', 'artist_name': 'A'}, + {'album_id': 'alb-y', 'album_title': 'Y', 'artist_id': 'ar-1', 'artist_name': 'A'}, + {'album_id': 'alb-x', 'album_title': 'X (dup)', 'artist_id': 'ar-1', 'artist_name': 'A'}, + ] + result = queue.enqueue_many(items) + assert result == {'enqueued': 2, 'already_queued': 1, 'total': 3} + # Wait for the queue to drain, then give the worker a moment to + # try (and fail) to pick a phantom third item. If the dedupe leaked, + # a third runner call would land here. + assert _wait_for(lambda: queue.snapshot()['active'] is None and not queue.snapshot()['queued']) + time.sleep(0.05) + assert len(record) == 2 + + +def test_cancel_and_run_are_mutually_exclusive(queue): + """Regression for kettui's ``_next_queued() → status flip`` race: + a successfully-cancelled item must NEVER have its runner invoked. + With the old non-atomic pick + flip, cancel could land between + the worker's pick and its flip-to-running, leaving the item + marked 'cancelled' but the worker still runs it. + + Hammers many enqueue-then-immediately-cancel pairs to exercise the + race window. After draining, every queue_id whose cancel returned + ``cancelled: True`` must NOT appear in the runner's record.""" + runner_called: set = set() + runner_lock = threading.Lock() + + def runner(item): + with runner_lock: + runner_called.add(item.queue_id) + # Slight runtime widens the window where overlapping cancels + # could (incorrectly) fire on a running item. + time.sleep(0.002) + return { + 'status': 'completed', 'source': 'spotify', + 'total': 1, 'moved': 1, 'skipped': 0, 'failed': 0, 'errors': [], + } + + queue.set_runner(runner) + + successful_cancels: set = set() + for i in range(50): + r = _enqueue(queue, album_id=f'alb-race-{i}') + # Immediately try to cancel — half will land while item is still + # 'queued', half will land after worker has flipped to 'running'. + if queue.cancel(r['queue_id'])['cancelled']: + successful_cancels.add(r['queue_id']) + + assert _wait_for( + lambda: queue.snapshot()['active'] is None and not queue.snapshot()['queued'], + timeout=5.0, + ) + + leaked = successful_cancels & runner_called + assert not leaked, f"Runner ran for cancelled items: {leaked}" + + def test_no_runner_marks_item_failed(queue): """If the worker pulls an item but no runner has been set, the item must be marked failed (not silently dropped). In practice diff --git a/web_server.py b/web_server.py index 219223fb..7ba8afc0 100644 --- a/web_server.py +++ b/web_server.py @@ -22777,6 +22777,17 @@ def get_version_info(): "title": "What's New in SoulSync", "subtitle": f"Version {SOULSYNC_VERSION} — Latest Changes", "sections": [ + { + "title": "Reorganize Queue: Race-Condition Hardening (kettui Review)", + "description": "Three concurrency / dedupe issues kettui caught in his review of PR #377, plus two related polish items from the same pass.", + "features": [ + "• Worker pick + status flip is now atomic — fixes a window where a cancel() landing between 'pick next queued' and 'flip to running' could mark an item cancelled but the worker still ran it", + "• Replaced the lock + wakeup-event pair with a single threading.Condition so newly-queued items can't sleep up to 60s waiting for the next wakeup tick (the old pair had an empty-check / clear-event race)", + "• enqueue_many now holds the queue lock for the whole batch and tracks a per-batch seen set, so duplicate album_ids inside one bulk call are deduped against each other (not just against pre-existing items)", + "• Reorganize-preview Apply button no longer gets stuck disabled when an early return / network error skipped the re-enable line — moved into a finally", + "• DB helpers get_album_display_meta and get_artist_albums_for_reorganize now let exceptions bubble instead of swallowing them as 'not found' / empty list — a real DB outage now surfaces as a 500 to the user instead of looking like a missing album", + ], + }, { "title": "Reorganize Queue with Live Status Panel", "description": "Reorganizing albums is no longer a foreground operation that locks the page. Click → enqueue → keep working. A status panel surfaces live progress.", diff --git a/webui/static/helper.js b/webui/static/helper.js index 46dd4703..31463231 100644 --- a/webui/static/helper.js +++ b/webui/static/helper.js @@ -3443,6 +3443,7 @@ const WHATS_NEW = { '2.40': [ // --- Search & Artists unification (in progress, not yet released) --- { date: 'Unreleased — Search & Artists unification', unreleased: true }, + { title: 'Reorganize Queue: Race-Condition Hardening (kettui Review)', desc: 'kettui\'s review of PR #377 caught two real concurrency bugs in the new reorganize queue and one input-deduplication gap. (1) Worker race: the worker thread looked up the next queued item, then released the lock, then re-acquired it to flip status to "running". A cancel() landing in that window would mark the item cancelled but the worker still ran it. Now picks and flips atomically under a single lock acquisition. (2) Wakeup race: the worker cleared its wakeup event after observing an empty queue, but enqueue could fire its wakeup.set() between the empty check and the clear, making a freshly-queued album sleep up to 60 seconds before the worker noticed. Replaced the lock + event pair with a single threading.Condition so check-and-wait happen under the same lock atomically. (3) Bulk-enqueue dedupe: enqueue_many called single-item enqueue in a loop, so two copies of the same album_id in one bulk request could both slip through if the worker finished the first copy before the loop reached the second. Now holds the queue lock for the entire batch and tracks a per-batch seen set, so intra-batch duplicates are deduped against each other, not just against pre-existing items. Also fixed two related issues from the same review: the reorganize-preview Apply button could get stuck disabled when an early return / network error skipped the re-enable line (moved into a finally), and the new DB helpers (get_album_display_meta, get_artist_albums_for_reorganize) used to swallow every exception and return None / [], which made a real DB outage look like "album not found" — they now let exceptions bubble so the route layer surfaces a proper 500', page: 'library', unreleased: true }, { title: 'Reorganize Queue with Live Status Panel', desc: 'Reorganizing albums no longer locks up the page or runs as a JS-driven loop. Each click on the per-album reorganize button — or "Reorganize All" — now enqueues into a single FIFO queue that a backend worker drains one item at a time. Buttons stay clickable: spam-clicking the same album silently dedupes, and you can keep browsing while items run. A status panel mounted at the top of the artist actions bar shows what\'s active (with a progress bar, current track, and live moved/skipped/failed counts), how many items are queued behind it, and recently-finished items with success/warning indicators. The panel expands to show the full queue with per-item cancel buttons (running items can\'t be cancelled mid-flight; queued ones can) and a "Cancel All" button for the queued tail. Items belonging to a different artist than the page you\'re on are flagged with a "(other artist)" hint so you understand what you\'re seeing. Bonus: "Reorganize All" is now one backend call instead of N JS-driven calls — much faster, and the artist context is captured server-side per item so the queue can show cross-artist progress correctly. Also retired the old single-slot status endpoint and the polling loop that depended on it', page: 'library', unreleased: true }, { title: 'Fix Album Completeness Job Reporting Zero Findings for Everyone', desc: 'sassmastawillis reported the Album Completeness maintenance job was finishing in 0.1s with 0 findings, even for users with obviously-incomplete albums. Root cause: the job used `albums.track_count` as the "expected total" to compare against the library\'s actual count. But `track_count` is populated by server syncs (Plex leafCount, SoulSync standalone len(tracks)) — it\'s always the OBSERVED count, never what the metadata provider says the album should contain. So expected == actual always, and every album looked complete. Fix: new `api_track_count` column on the albums table, written only by metadata-source code paths (Spotify, iTunes, Deezer, and Discogs enrichment workers now populate it whenever they fetch album data, so it piggybacks on existing API calls instead of making new ones). Server syncs never touch this column, so it stays authoritative. The repair job uses it as the expected total; if an album somehow hasn\'t been enriched yet, the job falls back to a live API lookup and caches the result. For users with an already-enriched library, the first completeness scan after the upgrade is fast because the workers will have populated the column during normal enrichment cycles', page: 'library', unreleased: true }, { title: 'Library Reorganize: Reroute Through the Download Pipeline', desc: 'Reported by winecountrygames — using "Reorganize All" on a 3-disc Aerosmith deluxe collapsed it to a flat 1-disc layout, and on other albums it left half the tracks in their original location with no error or count of what was skipped. Root cause: the reorganize endpoint reinvented several wheels (its own template engine, its own disc-number resolution from file tags, its own sidecar sweep, its own collision detection) and each had drifted from the canonical post-processing path used by downloads. The reorganize-only logic read disc_number from file tags and silently defaulted to 1 on any failure, so a single tag-less file collapsed the whole album to single-disc. Tracks whose file paths didn\'t resolve on disk were silently skipped. Rewrote it to follow the import page\'s pattern: copy each file to a per-album staging folder under your download path, look up the canonical tracklist from your configured metadata source (Deezer / Spotify / iTunes / Discogs / Hydrabase) using the album\'s stored source IDs, then route each file through the same `_post_process_matched_download` function fresh downloads use — same template, same tagging, same multi-disc subfolder logic, same sidecar handling, same AcoustID verification. Albums with no stored source ID are reported back and skipped entirely (degrading silently to file tags is what caused the original bug). Tracks not in the source\'s catalog version (bonus tracks on a deluxe edition) are reported as skipped and left in place rather than force-fed wrong context. Files that don\'t resolve on disk are surfaced with the offending DB path so the UI can show them. The 230-line inline reorganize logic in web_server.py was extracted into core/library_reorganize.py — net -195 lines from the monolith, +13 unit tests for the new orchestrator. Frontend behavior change: the per-call template parameter in the reorganize modal is now ignored — reorganize uses your configured download template, matching the pipeline downloads use', page: 'library', unreleased: true }, diff --git a/webui/static/library.js b/webui/static/library.js index b44a4c54..55b1892c 100644 --- a/webui/static/library.js +++ b/webui/static/library.js @@ -6224,6 +6224,13 @@ async function loadReorganizePreview() { if (applyBtn) applyBtn.disabled = true; previewBody.innerHTML = '
Loading preview...
'; + // Final apply-button state: only enable when the preview actually + // produced movable tracks AND no collisions blocked it. Any error + // path or empty result keeps it disabled. We compute it as we go and + // commit it in finally so an early return / throw can't leave the + // button stuck disabled forever. + let canApply = false; + try { const chosenSource = document.getElementById('reorganize-source-select')?.value || ''; const response = await fetch(`/api/library/album/${_reorganizeAlbumId}/reorganize/preview`, { @@ -6303,11 +6310,12 @@ async function loadReorganizePreview() { previewBody.innerHTML = summary + html; - // Block apply if collisions exist - if (applyBtn) applyBtn.disabled = !hasChanges || hasCollisions; + canApply = hasChanges && !hasCollisions; } catch (error) { previewBody.innerHTML = `
Error: ${escapeHtml(error.message)}
`; + } finally { + if (applyBtn) applyBtn.disabled = !canApply; } }