|
|
"""Pin the bounded-executor + scan-lock concurrency model in
|
|
|
``AutoImportWorker``.
|
|
|
|
|
|
Pre-refactor (before 2026-05-09): manual "Scan Now" clicks spawned a
|
|
|
fresh `threading.Thread(target=_scan_cycle)` per click on top of the
|
|
|
worker's existing 60-second timer-driven scan. Emergent parallelism
|
|
|
with no upper bound, no shared queue, no graceful shutdown. Different
|
|
|
scan cycles raced on `_processing_paths` / `_folder_snapshots` state.
|
|
|
|
|
|
Post-refactor:
|
|
|
- ONE scan at a time (`_scan_lock` non-blocking acquire — duplicate
|
|
|
triggers no-op).
|
|
|
- Per-candidate processing runs on a `ThreadPoolExecutor` (default 3
|
|
|
workers, configurable via `auto_import.max_workers`).
|
|
|
- Both timer + manual triggers share `trigger_scan()` so they go
|
|
|
through the same lock + executor.
|
|
|
|
|
|
These tests pin the CONCURRENCY CONTRACT, not the per-candidate
|
|
|
processing logic (which is covered separately by
|
|
|
``test_auto_import_live_progress.py`` etc.).
|
|
|
"""
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
import threading
|
|
|
import time
|
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
|
|
import pytest
|
|
|
|
|
|
from core.auto_import_worker import AutoImportWorker, FolderCandidate
|
|
|
|
|
|
|
|
|
def _make_worker(max_workers: int = 3) -> AutoImportWorker:
|
|
|
"""Bare worker — for the executor/lock tests we don't need full
|
|
|
db / config / process_callback dependencies."""
|
|
|
return AutoImportWorker(
|
|
|
database=MagicMock(),
|
|
|
process_callback=MagicMock(),
|
|
|
max_workers=max_workers,
|
|
|
)
|
|
|
|
|
|
|
|
|
def _make_candidate(folder_hash: str = 'h1', name: str = 'TestAlbum') -> FolderCandidate:
|
|
|
return FolderCandidate(
|
|
|
path=f'/staging/{name}',
|
|
|
name=name,
|
|
|
audio_files=[f'/staging/{name}/01.flac'],
|
|
|
folder_hash=folder_hash,
|
|
|
)
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
# Pool configuration
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
def test_default_max_workers_is_three():
|
|
|
"""Match the existing pool patterns in this codebase
|
|
|
(missing_download_executor, sync_executor, import_singles_executor
|
|
|
all default to 3)."""
|
|
|
w = _make_worker()
|
|
|
assert w._max_workers == 3
|
|
|
|
|
|
|
|
|
def test_max_workers_configurable_via_constructor():
|
|
|
w = _make_worker(max_workers=5)
|
|
|
assert w._max_workers == 5
|
|
|
|
|
|
|
|
|
def test_max_workers_floors_at_one():
|
|
|
"""0 or negative pool size would deadlock anything submitted —
|
|
|
floor at 1 so a misconfigured value still works."""
|
|
|
w = _make_worker(max_workers=0)
|
|
|
assert w._max_workers == 1
|
|
|
|
|
|
|
|
|
def test_max_workers_pulled_from_config_when_provided():
|
|
|
config = MagicMock()
|
|
|
config.get = MagicMock(side_effect=lambda key, default: 7 if key == 'auto_import.max_workers' else default)
|
|
|
w = AutoImportWorker(
|
|
|
database=MagicMock(),
|
|
|
process_callback=MagicMock(),
|
|
|
config_manager=config,
|
|
|
max_workers=3, # constructor default — overridden by config
|
|
|
)
|
|
|
assert w._max_workers == 7
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
# Scan lock — duplicate triggers no-op
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
def test_concurrent_triggers_only_one_scan_runs(monkeypatch):
|
|
|
"""Pre-refactor regression case: hitting "Scan Now" 5× in quick
|
|
|
succession used to spawn 5 parallel scan cycles. Post-refactor:
|
|
|
only one runs, the rest no-op via the non-blocking lock."""
|
|
|
w = _make_worker()
|
|
|
scan_count = 0
|
|
|
scan_started = threading.Event()
|
|
|
scan_can_finish = threading.Event()
|
|
|
|
|
|
def fake_scan_and_submit():
|
|
|
nonlocal scan_count
|
|
|
scan_count += 1
|
|
|
scan_started.set()
|
|
|
scan_can_finish.wait(timeout=5)
|
|
|
|
|
|
monkeypatch.setattr(w, '_scan_and_submit', fake_scan_and_submit)
|
|
|
|
|
|
# Fire 5 trigger_scan calls in parallel
|
|
|
threads = [threading.Thread(target=w.trigger_scan) for _ in range(5)]
|
|
|
for t in threads:
|
|
|
t.start()
|
|
|
|
|
|
# Wait for the first scan to start
|
|
|
assert scan_started.wait(timeout=5)
|
|
|
# The other 4 should have already returned (lock was held)
|
|
|
time.sleep(0.1)
|
|
|
assert scan_count == 1, (
|
|
|
f"Expected exactly 1 scan to run while the lock was held, got "
|
|
|
f"{scan_count}. The non-blocking scan lock isn't gating "
|
|
|
f"duplicate triggers."
|
|
|
)
|
|
|
|
|
|
# Release the held scan
|
|
|
scan_can_finish.set()
|
|
|
for t in threads:
|
|
|
t.join(timeout=5)
|
|
|
|
|
|
# No additional scans started after release (the 4 losers gave up,
|
|
|
# didn't queue)
|
|
|
assert scan_count == 1
|
|
|
|
|
|
|
|
|
def test_scan_after_previous_finishes_runs_normally(monkeypatch):
|
|
|
"""Lock releases when scan finishes — next trigger should acquire
|
|
|
+ run normally, not be permanently blocked."""
|
|
|
w = _make_worker()
|
|
|
scan_count = 0
|
|
|
|
|
|
def fake_scan_and_submit():
|
|
|
nonlocal scan_count
|
|
|
scan_count += 1
|
|
|
|
|
|
monkeypatch.setattr(w, '_scan_and_submit', fake_scan_and_submit)
|
|
|
|
|
|
w.trigger_scan()
|
|
|
w.trigger_scan()
|
|
|
w.trigger_scan()
|
|
|
|
|
|
assert scan_count == 3
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
# Executor — per-candidate parallelism
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
def test_candidates_dispatched_to_executor(monkeypatch):
|
|
|
"""Scan finds N candidates → submits N tasks to the executor pool.
|
|
|
Pool runs them in parallel (up to max_workers). Each task ends up
|
|
|
calling `_process_one_candidate`."""
|
|
|
w = _make_worker(max_workers=3)
|
|
|
w.start() # initialises the executor
|
|
|
|
|
|
try:
|
|
|
candidates = [
|
|
|
_make_candidate(folder_hash=f'h{i}', name=f'Album{i}')
|
|
|
for i in range(5)
|
|
|
]
|
|
|
monkeypatch.setattr(w, '_enumerate_folders', lambda staging: candidates)
|
|
|
monkeypatch.setattr(w, '_resolve_staging_path', lambda: '/staging')
|
|
|
monkeypatch.setattr('core.auto_import_worker.os.path.isdir', lambda p: True)
|
|
|
monkeypatch.setattr(w, '_is_already_processed', lambda h: False)
|
|
|
monkeypatch.setattr(w, '_is_folder_stable', lambda c: True)
|
|
|
|
|
|
processed = []
|
|
|
processed_lock = threading.Lock()
|
|
|
|
|
|
def fake_process(candidate):
|
|
|
with processed_lock:
|
|
|
processed.append(candidate.folder_hash)
|
|
|
|
|
|
monkeypatch.setattr(w, '_process_one_candidate', fake_process)
|
|
|
|
|
|
w.trigger_scan()
|
|
|
|
|
|
# Wait for all 5 to finish (executor runs async)
|
|
|
deadline = time.time() + 5
|
|
|
while len(processed) < 5 and time.time() < deadline:
|
|
|
time.sleep(0.05)
|
|
|
|
|
|
assert sorted(processed) == [f'h{i}' for i in range(5)]
|
|
|
finally:
|
|
|
w.stop()
|
|
|
|
|
|
|
|
|
def test_pool_runs_candidates_in_parallel():
|
|
|
"""With max_workers=3, the pool should run up to 3 candidates
|
|
|
concurrently — proves the bounded parallelism the user asked for."""
|
|
|
w = _make_worker(max_workers=3)
|
|
|
w.start()
|
|
|
try:
|
|
|
# Submit 3 long-running tasks directly to the executor and
|
|
|
# confirm they run concurrently.
|
|
|
in_flight = [0]
|
|
|
peak_in_flight = [0]
|
|
|
lock = threading.Lock()
|
|
|
proceed = threading.Event()
|
|
|
|
|
|
def slow_task():
|
|
|
with lock:
|
|
|
in_flight[0] += 1
|
|
|
if in_flight[0] > peak_in_flight[0]:
|
|
|
peak_in_flight[0] = in_flight[0]
|
|
|
proceed.wait(timeout=2)
|
|
|
with lock:
|
|
|
in_flight[0] -= 1
|
|
|
|
|
|
futures = [w._executor.submit(slow_task) for _ in range(3)]
|
|
|
# Give them a beat to start
|
|
|
time.sleep(0.2)
|
|
|
assert peak_in_flight[0] == 3, (
|
|
|
f"Expected 3 concurrent tasks, peaked at {peak_in_flight[0]}"
|
|
|
)
|
|
|
proceed.set()
|
|
|
for f in futures:
|
|
|
f.result(timeout=2)
|
|
|
finally:
|
|
|
w.stop()
|
|
|
|
|
|
|
|
|
def test_executor_max_workers_caps_concurrency():
|
|
|
"""max_workers=2 must NOT allow 3 concurrent tasks. Bounded
|
|
|
parallelism — predictable system load."""
|
|
|
w = _make_worker(max_workers=2)
|
|
|
w.start()
|
|
|
try:
|
|
|
in_flight = [0]
|
|
|
peak = [0]
|
|
|
lock = threading.Lock()
|
|
|
proceed = threading.Event()
|
|
|
|
|
|
def slow_task():
|
|
|
with lock:
|
|
|
in_flight[0] += 1
|
|
|
if in_flight[0] > peak[0]:
|
|
|
peak[0] = in_flight[0]
|
|
|
proceed.wait(timeout=2)
|
|
|
with lock:
|
|
|
in_flight[0] -= 1
|
|
|
|
|
|
futures = [w._executor.submit(slow_task) for _ in range(5)]
|
|
|
time.sleep(0.3)
|
|
|
assert peak[0] == 2, (
|
|
|
f"max_workers=2 should cap concurrency at 2, peaked at {peak[0]}"
|
|
|
)
|
|
|
proceed.set()
|
|
|
for f in futures:
|
|
|
f.result(timeout=2)
|
|
|
finally:
|
|
|
w.stop()
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
# Submitted-hashes dedup across triggers
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
def test_candidate_only_submitted_once_across_concurrent_scans(monkeypatch):
|
|
|
"""Scenario: scan A submits candidate X to the pool; pool worker
|
|
|
is mid-processing. Scan B (manual trigger) enumerates again and
|
|
|
sees X — must NOT re-submit. `_submitted_hashes` set + lock
|
|
|
prevents double-submission."""
|
|
|
w = _make_worker()
|
|
|
w.start()
|
|
|
|
|
|
try:
|
|
|
cand = _make_candidate(folder_hash='shared-hash')
|
|
|
monkeypatch.setattr(w, '_enumerate_folders', lambda staging: [cand])
|
|
|
monkeypatch.setattr(w, '_resolve_staging_path', lambda: '/staging')
|
|
|
monkeypatch.setattr('core.auto_import_worker.os.path.isdir', lambda p: True)
|
|
|
monkeypatch.setattr(w, '_is_already_processed', lambda h: False)
|
|
|
monkeypatch.setattr(w, '_is_folder_stable', lambda c: True)
|
|
|
|
|
|
process_count = 0
|
|
|
process_lock = threading.Lock()
|
|
|
process_can_finish = threading.Event()
|
|
|
|
|
|
def slow_process(candidate):
|
|
|
nonlocal process_count
|
|
|
with process_lock:
|
|
|
process_count += 1
|
|
|
process_can_finish.wait(timeout=5)
|
|
|
|
|
|
monkeypatch.setattr(w, '_process_one_candidate', slow_process)
|
|
|
|
|
|
# First scan submits the candidate
|
|
|
w.trigger_scan()
|
|
|
# Wait for processing to start
|
|
|
time.sleep(0.1)
|
|
|
|
|
|
# Second scan WHILE first is processing — must not re-submit
|
|
|
w.trigger_scan()
|
|
|
time.sleep(0.1)
|
|
|
assert process_count == 1, (
|
|
|
f"Expected only 1 process call (dedup active), got {process_count}"
|
|
|
)
|
|
|
|
|
|
process_can_finish.set()
|
|
|
time.sleep(0.2)
|
|
|
|
|
|
# After the first finishes, the candidate still has the same
|
|
|
# hash + would be `_is_already_processed`, but our mock returns
|
|
|
# False — even so, the post-finally `discard` should let a
|
|
|
# third trigger re-pick if needed. Here we just verify dedup
|
|
|
# held while in flight.
|
|
|
finally:
|
|
|
process_can_finish.set()
|
|
|
w.stop()
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
# Graceful shutdown
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
def test_stop_waits_for_inflight_pool_work():
|
|
|
"""`stop()` must call `executor.shutdown(wait=True)` so in-flight
|
|
|
file moves / tag writes / DB inserts complete before shutdown
|
|
|
reports done. Otherwise interrupted writes corrupt state."""
|
|
|
w = _make_worker()
|
|
|
w.start()
|
|
|
|
|
|
finished = threading.Event()
|
|
|
|
|
|
def slow_task():
|
|
|
time.sleep(0.3)
|
|
|
finished.set()
|
|
|
|
|
|
w._executor.submit(slow_task)
|
|
|
|
|
|
# Stop immediately — should block until slow_task completes
|
|
|
w.stop()
|
|
|
|
|
|
assert finished.is_set(), (
|
|
|
"stop() returned before in-flight pool work finished — "
|
|
|
"executor shutdown(wait=True) is missing or broken"
|
|
|
)
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
# Per-candidate state isolation under parallel pool workers
|
|
|
# ---------------------------------------------------------------------------
|
|
|
#
|
|
|
# Pre-refactor `_current_folder` / `_current_track_*` / `_current_status` were
|
|
|
# scalar fields on the worker. Three pool workers running in parallel would
|
|
|
# stomp each other's values — UI showed "Processing AlbumA, track 7/14:
|
|
|
# SongFromAlbumB" interleaved garbage. These tests pin the per-candidate
|
|
|
# isolation introduced by the `_active_imports` dict + `_active_lock`.
|
|
|
|
|
|
|
|
|
def test_concurrent_candidates_dont_stomp_each_other():
|
|
|
"""Two pool workers updating their own candidate state must not
|
|
|
interfere — each candidate's track_index / track_name / folder_name
|
|
|
is read back exactly as written for that hash."""
|
|
|
w = _make_worker(max_workers=2)
|
|
|
w.start()
|
|
|
try:
|
|
|
cand_a = _make_candidate(folder_hash='hA', name='AlbumA')
|
|
|
cand_b = _make_candidate(folder_hash='hB', name='AlbumB')
|
|
|
|
|
|
# Register both
|
|
|
w._register_active(cand_a, status='processing')
|
|
|
w._register_active(cand_b, status='processing')
|
|
|
|
|
|
ready = threading.Barrier(2)
|
|
|
done = threading.Event()
|
|
|
|
|
|
def worker_for(cand, name_prefix, total):
|
|
|
ready.wait(timeout=2)
|
|
|
for i in range(1, total + 1):
|
|
|
w._update_active(
|
|
|
cand.folder_hash,
|
|
|
track_index=i,
|
|
|
track_total=total,
|
|
|
track_name=f'{name_prefix}-track-{i}',
|
|
|
)
|
|
|
# Tight loop so the two threads interleave aggressively
|
|
|
time.sleep(0.001)
|
|
|
|
|
|
ta = threading.Thread(target=worker_for, args=(cand_a, 'A', 50))
|
|
|
tb = threading.Thread(target=worker_for, args=(cand_b, 'B', 50))
|
|
|
ta.start(); tb.start()
|
|
|
ta.join(timeout=5); tb.join(timeout=5)
|
|
|
done.set()
|
|
|
|
|
|
snap = w._snapshot_active()
|
|
|
by_hash = {a['folder_hash']: a for a in snap}
|
|
|
|
|
|
assert by_hash['hA']['folder_name'] == 'AlbumA', (
|
|
|
"Candidate A's folder_name was overwritten by a parallel candidate — "
|
|
|
f"got {by_hash['hA']['folder_name']!r}"
|
|
|
)
|
|
|
assert by_hash['hB']['folder_name'] == 'AlbumB', (
|
|
|
"Candidate B's folder_name was overwritten — "
|
|
|
f"got {by_hash['hB']['folder_name']!r}"
|
|
|
)
|
|
|
assert by_hash['hA']['track_index'] == 50
|
|
|
assert by_hash['hB']['track_index'] == 50
|
|
|
assert by_hash['hA']['track_name'].startswith('A-')
|
|
|
assert by_hash['hB']['track_name'].startswith('B-')
|
|
|
finally:
|
|
|
w.stop()
|
|
|
|
|
|
|
|
|
def test_get_status_returns_coherent_active_imports_array():
|
|
|
"""`get_status()` must return one entry per in-flight candidate
|
|
|
with the right per-candidate fields — the polling UI reads this
|
|
|
array to render multiple in-flight imports simultaneously."""
|
|
|
w = _make_worker(max_workers=3)
|
|
|
w.start()
|
|
|
try:
|
|
|
for i, name in enumerate(['One', 'Two', 'Three']):
|
|
|
cand = _make_candidate(folder_hash=f'h{i}', name=name)
|
|
|
w._register_active(cand, status='processing')
|
|
|
w._update_active(cand.folder_hash, track_index=i + 1, track_total=10)
|
|
|
|
|
|
status = w.get_status()
|
|
|
active = status.get('active_imports') or []
|
|
|
assert len(active) == 3
|
|
|
names = {a['folder_name'] for a in active}
|
|
|
assert names == {'One', 'Two', 'Three'}
|
|
|
|
|
|
# Aggregate top-level should be 'processing' (any active is
|
|
|
# processing → processing wins)
|
|
|
assert status['current_status'] == 'processing'
|
|
|
|
|
|
# Legacy single-import scalars: populated from the FIRST
|
|
|
# active entry (insertion order) so the existing UI keeps
|
|
|
# working when only one candidate is in flight.
|
|
|
assert status['current_folder'] == 'One'
|
|
|
assert status['current_track_index'] == 1
|
|
|
assert status['current_track_total'] == 10
|
|
|
finally:
|
|
|
w.stop()
|
|
|
|
|
|
|
|
|
def test_unregister_removes_only_that_candidate():
|
|
|
"""`_unregister_active(hash)` removes one entry; others stay
|
|
|
visible. Pool workers finishing in any order must not affect
|
|
|
other in-flight candidates' UI state."""
|
|
|
w = _make_worker()
|
|
|
w.start()
|
|
|
try:
|
|
|
for i, name in enumerate(['X', 'Y', 'Z']):
|
|
|
w._register_active(_make_candidate(folder_hash=f'k{i}', name=name))
|
|
|
|
|
|
w._unregister_active('k1')
|
|
|
snap = w._snapshot_active()
|
|
|
names = {a['folder_name'] for a in snap}
|
|
|
assert names == {'X', 'Z'}, f"Unexpected snapshot after unregister: {snap}"
|
|
|
finally:
|
|
|
w.stop()
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
# Stats counter integrity under parallel bumps
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
def test_stats_increments_are_thread_safe():
|
|
|
"""`self._stats[k] += 1` from multiple threads is read-modify-
|
|
|
write — under load the counters drift. `_bump_stat` wraps every
|
|
|
mutation in `_stats_lock` so 1000 parallel bumps land at 1000."""
|
|
|
w = _make_worker()
|
|
|
iterations = 200
|
|
|
threads_count = 5
|
|
|
expected = iterations * threads_count
|
|
|
|
|
|
def hammer():
|
|
|
for _ in range(iterations):
|
|
|
w._bump_stat('scanned')
|
|
|
|
|
|
threads = [threading.Thread(target=hammer) for _ in range(threads_count)]
|
|
|
for t in threads:
|
|
|
t.start()
|
|
|
for t in threads:
|
|
|
t.join(timeout=5)
|
|
|
|
|
|
assert w._stats['scanned'] == expected, (
|
|
|
f"Lost increments: expected {expected}, got {w._stats['scanned']}. "
|
|
|
f"Stats counter is not thread-safe."
|
|
|
)
|
|
|
|
|
|
|
|
|
def test_get_status_stats_snapshot_is_consistent():
|
|
|
"""`get_status()` reads stats under the same lock that mutations
|
|
|
use, so the returned snapshot can't show a partial mid-update
|
|
|
state. Verify the snapshot is a copy (not a live reference)."""
|
|
|
w = _make_worker()
|
|
|
w._bump_stat('scanned')
|
|
|
snap = w.get_status()['stats']
|
|
|
snap['scanned'] = 9999
|
|
|
# Mutating the snapshot must not affect the worker's internal stats
|
|
|
assert w._stats['scanned'] == 1, (
|
|
|
"get_status() returned a live reference to _stats — "
|
|
|
"callers can corrupt internal state."
|
|
|
)
|