mirror of https://github.com/Nezreka/SoulSync.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
336 lines
12 KiB
336 lines
12 KiB
"""Regression tests for parallel singles-import processing.
|
|
|
|
Discord-reported (fresh.dumbledore + maintainer ack): the
|
|
``/api/import/singles/process`` endpoint processed staging files
|
|
sequentially in a Python ``for`` loop. Per-file work is dominated by
|
|
metadata search round-trips (Spotify/iTunes/Deezer), so a
|
|
multi-track manual import on a typical home network was painfully
|
|
slow. The maintainer acknowledged needing multiple workers.
|
|
|
|
These tests pin the new behaviour:
|
|
|
|
- The per-file worker function exists, returns a typed outcome
|
|
``(status, payload)``, and is safe to call concurrently from the
|
|
shared ThreadPoolExecutor.
|
|
- Successful files report ``("ok", final_title)`` so the route can
|
|
count them.
|
|
- Failed metadata resolution / bad files report ``("error", msg)``.
|
|
- A worker that raises an unexpected exception is caught by the
|
|
caller (the test verifies that behaviour through the route).
|
|
"""
|
|
|
|
import logging
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _restore_soulsync_logger_state():
|
|
"""Snapshot the ``soulsync`` logger config before this file's tests
|
|
run and restore it afterwards.
|
|
|
|
Importing ``web_server`` calls ``utils.logging_config.setup_logging``
|
|
at module-init time, which clears + re-installs handlers on the
|
|
``soulsync`` logger and pins its level to whatever the user's
|
|
config said. That mutation leaks across tests in the same pytest
|
|
process and broke
|
|
``test_library_reorganize_orchestrator::test_watchdog_warns_about_stuck_workers``
|
|
that runs later alphabetically and relies on caplog capturing
|
|
``soulsync.library_reorganize`` warnings via root-logger
|
|
propagation.
|
|
|
|
Without this fixture, my file ran first alphabetically, mutated
|
|
the global soulsync logger, and the watchdog test downstream
|
|
saw ``caplog.records == []``. Snapshot + restore keeps the
|
|
pollution scoped to this file's tests only.
|
|
"""
|
|
soulsync_logger = logging.getLogger("soulsync")
|
|
saved_handlers = list(soulsync_logger.handlers)
|
|
saved_level = soulsync_logger.level
|
|
saved_propagate = soulsync_logger.propagate
|
|
try:
|
|
yield
|
|
finally:
|
|
soulsync_logger.handlers = saved_handlers
|
|
soulsync_logger.setLevel(saved_level)
|
|
soulsync_logger.propagate = saved_propagate
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Worker contract
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_worker_returns_error_for_missing_file(tmp_path) -> None:
|
|
"""Files whose path doesn't exist must short-circuit with a
|
|
user-readable error, not raise — otherwise the executor's caller
|
|
can't aggregate them cleanly."""
|
|
from web_server import _process_single_import_file
|
|
|
|
file_info = {
|
|
'full_path': str(tmp_path / "does-not-exist.mp3"),
|
|
'filename': 'does-not-exist.mp3',
|
|
}
|
|
outcome, payload = _process_single_import_file(file_info)
|
|
assert outcome == "error"
|
|
assert "File not found" in payload
|
|
|
|
|
|
def test_worker_returns_error_for_malformed_manual_match(tmp_path) -> None:
|
|
"""Manual matches missing source or id must be rejected with a
|
|
clear message rather than crashing the resolver downstream."""
|
|
from web_server import _process_single_import_file
|
|
|
|
audio_file = tmp_path / "track.mp3"
|
|
audio_file.write_bytes(b"fake")
|
|
|
|
file_info = {
|
|
'full_path': str(audio_file),
|
|
'filename': 'track.mp3',
|
|
'manual_match': {'source': '', 'id': ''},
|
|
}
|
|
outcome, payload = _process_single_import_file(file_info)
|
|
assert outcome == "error"
|
|
assert "Malformed manual match" in payload
|
|
|
|
|
|
def test_worker_wraps_pipeline_exception_as_error(tmp_path) -> None:
|
|
"""If the post-processing pipeline raises, the worker must catch
|
|
it and report ``("error", msg)`` so a single bad file doesn't
|
|
take the whole batch down via the executor's caller path."""
|
|
from web_server import _process_single_import_file
|
|
|
|
audio_file = tmp_path / "track.mp3"
|
|
audio_file.write_bytes(b"fake")
|
|
|
|
file_info = {
|
|
'full_path': str(audio_file),
|
|
'filename': 'track.mp3',
|
|
'title': 'Some Song',
|
|
'artist': 'Some Artist',
|
|
}
|
|
|
|
with patch(
|
|
"core.imports.resolution.get_single_track_import_context",
|
|
side_effect=RuntimeError("metadata service down"),
|
|
):
|
|
outcome, payload = _process_single_import_file(file_info)
|
|
assert outcome == "error"
|
|
assert "metadata service down" in payload
|
|
|
|
|
|
def test_worker_returns_ok_with_resolved_title(tmp_path) -> None:
|
|
"""Happy path: pipeline succeeds → ``("ok", final_title)`` so the
|
|
route can use it for the activity feed message."""
|
|
from web_server import _process_single_import_file
|
|
|
|
audio_file = tmp_path / "track.mp3"
|
|
audio_file.write_bytes(b"fake")
|
|
|
|
file_info = {
|
|
'full_path': str(audio_file),
|
|
'filename': 'track.mp3',
|
|
'title': 'Resolved Title',
|
|
'artist': 'Resolved Artist',
|
|
}
|
|
|
|
fake_resolved = {
|
|
'context': {
|
|
'artist': {'name': 'Resolved Artist'},
|
|
'track_info': {'name': 'Resolved Title'},
|
|
'album': {},
|
|
'original_search_result': {
|
|
'title': 'Resolved Title',
|
|
'artist': 'Resolved Artist',
|
|
'clean_title': 'Resolved Title',
|
|
'clean_artist': 'Resolved Artist',
|
|
'clean_album': '',
|
|
'album': '',
|
|
},
|
|
},
|
|
'source': 'spotify',
|
|
}
|
|
|
|
with patch(
|
|
"core.imports.resolution.get_single_track_import_context",
|
|
return_value=fake_resolved,
|
|
):
|
|
with patch("web_server._post_process_matched_download") as ppm:
|
|
ppm.return_value = None
|
|
outcome, payload = _process_single_import_file(file_info)
|
|
|
|
assert outcome == "ok"
|
|
assert payload == "Resolved Title"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Executor wiring
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_import_singles_executor_uses_three_workers() -> None:
|
|
"""Pin the worker count — the user's report (and the maintainer's
|
|
acknowledgement) specifically asked for parallelism. Three workers
|
|
balance throughput against per-source rate-limit pressure."""
|
|
from web_server import import_singles_executor
|
|
|
|
assert import_singles_executor._max_workers == 3
|
|
|
|
|
|
def test_import_singles_executor_threads_are_named_for_diagnostics() -> None:
|
|
"""Named threads make crash logs and rate-limit diagnostics
|
|
immediately attributable to this pool. Without a thread name
|
|
prefix, log lines from these workers look identical to the
|
|
download workers and post-processing workers."""
|
|
from web_server import import_singles_executor
|
|
|
|
assert import_singles_executor._thread_name_prefix == "ImportSingleWorker"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# End-to-end route integration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_route_processes_multiple_files_in_parallel(tmp_path) -> None:
|
|
"""End-to-end: hit the actual /api/import/singles/process route
|
|
with multiple files and assert all of them ran. The worker stub
|
|
sleeps briefly so a sequential run would be markedly slower than
|
|
a 3-worker parallel run; the test pins parallelism by checking
|
|
wall-clock duration is well under the sequential cost.
|
|
"""
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
import time as _time
|
|
|
|
audio_files = []
|
|
for i in range(6):
|
|
f = tmp_path / f"track_{i}.mp3"
|
|
f.write_bytes(b"fake audio")
|
|
audio_files.append(f)
|
|
|
|
files_payload = [
|
|
{
|
|
'full_path': str(f),
|
|
'filename': f.name,
|
|
'title': f"Track {i}",
|
|
'artist': "Test Artist",
|
|
}
|
|
for i, f in enumerate(audio_files)
|
|
]
|
|
|
|
sleep_per_call = 0.3 # 6 files * 0.3s = 1.8s sequential, <0.7s with 3 workers
|
|
|
|
def fake_worker(file_info):
|
|
_time.sleep(sleep_per_call)
|
|
return ("ok", file_info.get('title', '?'))
|
|
|
|
from web_server import app as flask_app
|
|
flask_app.config['TESTING'] = True
|
|
client = flask_app.test_client()
|
|
|
|
with patch("web_server._process_single_import_file", side_effect=fake_worker):
|
|
start = _time.monotonic()
|
|
response = client.post(
|
|
"/api/import/singles/process",
|
|
json={'files': files_payload},
|
|
)
|
|
duration = _time.monotonic() - start
|
|
|
|
assert response.status_code == 200
|
|
payload = response.get_json()
|
|
assert payload['success'] is True
|
|
assert payload['processed'] == 6
|
|
assert payload['total'] == 6
|
|
assert payload['errors'] == []
|
|
|
|
sequential_cost = sleep_per_call * 6
|
|
# Parallel run with 3 workers should finish in ~2 batches:
|
|
# ceil(6 / 3) * 0.3 = 0.6s of sleep + Python overhead. Allow up
|
|
# to 2/3 of the sequential cost as the upper bound.
|
|
assert duration < sequential_cost * (2 / 3), (
|
|
f"route did not parallelize — took {duration:.2f}s, "
|
|
f"sequential would take ~{sequential_cost:.2f}s"
|
|
)
|
|
|
|
|
|
def test_route_aggregates_mixed_success_and_error_outcomes(tmp_path) -> None:
|
|
"""Errors from individual files must not abort the batch; the
|
|
final response must list every error and report the success
|
|
count separately. Pre-fix, an exception in any single file's
|
|
pipeline would propagate up the for-loop's try/except — but
|
|
the as_completed loop has its own per-future try/except that's
|
|
worth pinning."""
|
|
audio_files = []
|
|
for i in range(4):
|
|
f = tmp_path / f"track_{i}.mp3"
|
|
f.write_bytes(b"fake")
|
|
audio_files.append(f)
|
|
|
|
files_payload = [
|
|
{'full_path': str(f), 'filename': f.name, 'title': f"Track {i}", 'artist': 'A'}
|
|
for i, f in enumerate(audio_files)
|
|
]
|
|
|
|
def mixed_worker(file_info):
|
|
# Files 0 and 2 succeed, 1 and 3 fail
|
|
idx = int(file_info['filename'].split('_')[1].split('.')[0])
|
|
if idx % 2 == 0:
|
|
return ("ok", file_info['title'])
|
|
return ("error", f"{file_info['title']}: simulated failure")
|
|
|
|
from web_server import app as flask_app
|
|
flask_app.config['TESTING'] = True
|
|
client = flask_app.test_client()
|
|
|
|
with patch("web_server._process_single_import_file", side_effect=mixed_worker):
|
|
response = client.post(
|
|
"/api/import/singles/process",
|
|
json={'files': files_payload},
|
|
)
|
|
|
|
payload = response.get_json()
|
|
assert payload['processed'] == 2
|
|
assert payload['total'] == 4
|
|
assert len(payload['errors']) == 2
|
|
assert all('simulated failure' in err for err in payload['errors'])
|
|
|
|
|
|
def test_route_recovers_from_worker_crash(tmp_path) -> None:
|
|
"""If a worker function raises an unhandled exception (shouldn't
|
|
happen — the worker wraps its own pipeline call — but defensive),
|
|
the route must still finish and report the crash in the errors
|
|
list rather than 500-ing the whole batch."""
|
|
audio_files = [tmp_path / f"track_{i}.mp3" for i in range(3)]
|
|
for f in audio_files:
|
|
f.write_bytes(b"fake")
|
|
|
|
files_payload = [
|
|
{'full_path': str(f), 'filename': f.name, 'title': f"T{i}", 'artist': 'A'}
|
|
for i, f in enumerate(audio_files)
|
|
]
|
|
|
|
call_count = {'n': 0}
|
|
|
|
def crashing_worker(file_info):
|
|
call_count['n'] += 1
|
|
if call_count['n'] == 2:
|
|
raise RuntimeError("worker boom")
|
|
return ("ok", file_info['title'])
|
|
|
|
from web_server import app as flask_app
|
|
flask_app.config['TESTING'] = True
|
|
client = flask_app.test_client()
|
|
|
|
with patch("web_server._process_single_import_file", side_effect=crashing_worker):
|
|
response = client.post(
|
|
"/api/import/singles/process",
|
|
json={'files': files_payload},
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
payload = response.get_json()
|
|
assert payload['success'] is True
|
|
assert payload['processed'] == 2 # The two non-crashing calls
|
|
assert any('worker crashed' in err for err in payload['errors'])
|