mirror of https://github.com/Nezreka/SoulSync.git
First sub-PR in the download orchestrator series. Strict 1:1 lift — zero behavior change. What moved: - _record_sync_history_start → record_sync_history_start - _record_sync_history_completion → record_sync_history_completion - _detect_sync_source → detect_sync_source - Source prefix map → module-level _SOURCE_PREFIX_MAP constant What stayed: - web_server.py keeps three thin wrappers (_detect_sync_source, _record_sync_history_start, _record_sync_history_completion) that delegate into core/downloads/history.py. ~60 callers of these names in web_server.py keep resolving without touching every site. Each lifted function takes `database` as an arg (was `db = MusicDatabase()` inline). The wrappers construct `MusicDatabase()` per call to mirror the exact original behavior — each invocation got a fresh DB connection. Behavior parity: - Same SQL UPDATE statement (preserves the in-place update path when a sync_history entry already exists for the playlist_id) - Same JSON serialization with ensure_ascii=False - Same thumb URL extraction order (album_context.images → image_url → first track album.images) - Same per-track result shape (index, name, artist, album, image_url, duration_ms, source_track_id, status, confidence, matched_track, download_status) - Same status mapping (found/not_found, completed/failed) - Same best-effort exception swallowing (sync history failure must never break the actual download) - Reads `download_tasks` from core.runtime_state (already lifted by kettui in PR378) Tests: 34 new under tests/downloads/test_downloads_history.py covering source detection (16 prefixes), start happy paths + thumb extraction + duplicate-update + DB error swallowing, completion stats + per-track results JSON shape + edge cases. Full suite: 907 passing (was 873). Ruff clean.pull/394/head
parent
caf5ee9e98
commit
3ce25310a3
@ -0,0 +1,8 @@
|
||||
"""Download orchestrator helpers package.
|
||||
|
||||
Lifted from web_server.py download/sync orchestration code. Each module
|
||||
covers a discrete piece of the pipeline:
|
||||
|
||||
- history — sync_history table writes (start + completion)
|
||||
- (more arriving in subsequent PRs as the orchestrator gets carved up)
|
||||
"""
|
||||
@ -0,0 +1,204 @@
|
||||
"""Sync history recording.
|
||||
|
||||
Two write paths: `record_sync_history_start` runs when a batch is
|
||||
submitted (creates or updates a sync_history row), and
|
||||
`record_sync_history_completion` runs when a batch finishes (updates
|
||||
counts + per-track results). Plus `detect_sync_source` which derives
|
||||
the source label from the playlist_id prefix.
|
||||
|
||||
Every write is wrapped in a try/except — sync history is best-effort,
|
||||
a failure here must never break a real download.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
|
||||
from core.runtime_state import download_tasks
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
_SOURCE_PREFIX_MAP = [
|
||||
# Mirrored playlists go through YouTube discovery, so youtube_mirrored_ must be checked first
|
||||
('auto_mirror_', 'mirrored'), ('youtube_mirrored_', 'mirrored'),
|
||||
('youtube_', 'youtube'), ('beatport_', 'beatport'),
|
||||
('tidal_', 'tidal'), ('deezer_', 'deezer'), ('listenbrainz_', 'listenbrainz'),
|
||||
('spotify_public_', 'spotify_public'), ('discover_album_', 'discover'),
|
||||
('seasonal_album_', 'discover'), ('library_redownload_', 'library'),
|
||||
('issue_download_', 'library'), ('artist_album_', 'spotify'),
|
||||
('enhanced_search_', 'spotify'), ('spotify_library_', 'spotify'),
|
||||
('beatport_release_', 'beatport'), ('beatport_chart_', 'beatport'),
|
||||
('beatport_top100_', 'beatport'), ('beatport_hype100_', 'beatport'),
|
||||
('beatport_sync_', 'beatport'),
|
||||
]
|
||||
|
||||
|
||||
def detect_sync_source(playlist_id: str) -> str:
|
||||
"""Derive the sync source from the playlist_id prefix."""
|
||||
for prefix, source in _SOURCE_PREFIX_MAP:
|
||||
if playlist_id.startswith(prefix):
|
||||
return source
|
||||
if playlist_id == 'wishlist':
|
||||
return 'wishlist'
|
||||
return 'spotify'
|
||||
|
||||
|
||||
def record_sync_history_start(
|
||||
database,
|
||||
batch_id: str,
|
||||
playlist_id: str,
|
||||
playlist_name: str,
|
||||
tracks: list,
|
||||
is_album_download: bool,
|
||||
album_context,
|
||||
artist_context,
|
||||
playlist_folder_mode: bool,
|
||||
source_page=None,
|
||||
) -> None:
|
||||
"""Record a sync start to the database.
|
||||
|
||||
If a previous sync_history row exists for the same playlist_id, update
|
||||
it in place rather than creating a duplicate.
|
||||
"""
|
||||
try:
|
||||
source = detect_sync_source(playlist_id)
|
||||
if playlist_id == 'wishlist':
|
||||
sync_type = 'wishlist'
|
||||
elif is_album_download:
|
||||
sync_type = 'album'
|
||||
else:
|
||||
sync_type = 'playlist'
|
||||
|
||||
# Extract thumb URL from album context or first track
|
||||
thumb_url = None
|
||||
if album_context:
|
||||
images = album_context.get('images', [])
|
||||
if images and isinstance(images, list) and len(images) > 0:
|
||||
thumb_url = images[0].get('url') if isinstance(images[0], dict) else images[0]
|
||||
if not thumb_url:
|
||||
thumb_url = album_context.get('image_url')
|
||||
if not thumb_url and tracks:
|
||||
first_album = tracks[0].get('album', {})
|
||||
if isinstance(first_album, dict):
|
||||
imgs = first_album.get('images', [])
|
||||
if imgs and isinstance(imgs, list) and len(imgs) > 0:
|
||||
thumb_url = imgs[0].get('url') if isinstance(imgs[0], dict) else imgs[0]
|
||||
|
||||
# Check for existing entry with same playlist_id — update instead of duplicating
|
||||
existing = database.get_latest_sync_history_by_playlist(playlist_id)
|
||||
if existing:
|
||||
try:
|
||||
conn = database._get_connection()
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(
|
||||
"""
|
||||
UPDATE sync_history
|
||||
SET batch_id = ?, playlist_name = ?, source = ?, sync_type = ?,
|
||||
tracks_json = ?, artist_context = ?, album_context = ?,
|
||||
thumb_url = ?, total_tracks = ?, is_album_download = ?,
|
||||
playlist_folder_mode = ?, source_page = ?, started_at = CURRENT_TIMESTAMP,
|
||||
completed_at = NULL, tracks_found = 0, tracks_downloaded = 0, tracks_failed = 0
|
||||
WHERE id = ?
|
||||
""",
|
||||
(batch_id, playlist_name, source, sync_type,
|
||||
json.dumps(tracks, ensure_ascii=False),
|
||||
json.dumps(artist_context, ensure_ascii=False) if artist_context else None,
|
||||
json.dumps(album_context, ensure_ascii=False) if album_context else None,
|
||||
thumb_url, len(tracks), int(is_album_download), int(playlist_folder_mode),
|
||||
source_page, existing['id']),
|
||||
)
|
||||
conn.commit()
|
||||
logger.info(f"Updated existing sync history entry {existing['id']} for '{playlist_name}'")
|
||||
return
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to update existing sync history, creating new: {e}")
|
||||
|
||||
database.add_sync_history_entry(
|
||||
batch_id=batch_id,
|
||||
playlist_id=playlist_id,
|
||||
playlist_name=playlist_name,
|
||||
source=source,
|
||||
sync_type=sync_type,
|
||||
tracks_json=json.dumps(tracks, ensure_ascii=False),
|
||||
artist_context=json.dumps(artist_context, ensure_ascii=False) if artist_context else None,
|
||||
album_context=json.dumps(album_context, ensure_ascii=False) if album_context else None,
|
||||
thumb_url=thumb_url,
|
||||
total_tracks=len(tracks),
|
||||
is_album_download=is_album_download,
|
||||
playlist_folder_mode=playlist_folder_mode,
|
||||
source_page=source_page,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to record sync history start: {e}")
|
||||
|
||||
|
||||
def record_sync_history_completion(database, batch_id: str, batch: dict) -> None:
|
||||
"""Update sync_history with completion stats + per-track results.
|
||||
|
||||
NOTE: Called from within tasks_lock context — does NOT acquire it here.
|
||||
Reads from `download_tasks` (also lock-protected by caller).
|
||||
"""
|
||||
try:
|
||||
analysis_results = batch.get('analysis_results', [])
|
||||
tracks_found = sum(1 for r in analysis_results if r.get('found'))
|
||||
queue = batch.get('queue', [])
|
||||
completed_count = 0
|
||||
failed_count = len(batch.get('permanently_failed_tracks', []))
|
||||
|
||||
# Build download status map: track_index → status
|
||||
download_status_map: dict = {}
|
||||
for task_id in queue:
|
||||
task = download_tasks.get(task_id, {})
|
||||
ti = task.get('track_index')
|
||||
if ti is not None:
|
||||
download_status_map[ti] = task.get('status', 'unknown')
|
||||
if task.get('status') == 'completed':
|
||||
completed_count += 1
|
||||
|
||||
# Build per-track results from analysis
|
||||
track_results = []
|
||||
for res in analysis_results:
|
||||
track_data = res.get('track', {})
|
||||
artists = track_data.get('artists', [])
|
||||
if artists:
|
||||
first = artists[0]
|
||||
artist_name = first.get('name', first) if isinstance(first, dict) else str(first)
|
||||
else:
|
||||
artist_name = ''
|
||||
|
||||
album = track_data.get('album', '')
|
||||
album_name = album.get('name', '') if isinstance(album, dict) else str(album or '')
|
||||
|
||||
# Extract image URL
|
||||
image_url = ''
|
||||
album_obj = track_data.get('album', {})
|
||||
if isinstance(album_obj, dict):
|
||||
imgs = album_obj.get('images', [])
|
||||
if imgs and isinstance(imgs, list) and len(imgs) > 0:
|
||||
image_url = imgs[0].get('url', '') if isinstance(imgs[0], dict) else ''
|
||||
|
||||
idx = res.get('track_index', 0)
|
||||
entry = {
|
||||
'index': idx,
|
||||
'name': track_data.get('name', ''),
|
||||
'artist': artist_name,
|
||||
'album': album_name,
|
||||
'image_url': image_url,
|
||||
'duration_ms': track_data.get('duration_ms', 0),
|
||||
'source_track_id': track_data.get('id', ''),
|
||||
'status': 'found' if res.get('found') else 'not_found',
|
||||
'confidence': round(res.get('confidence', 0.0), 3),
|
||||
'matched_track': None,
|
||||
'download_status': download_status_map.get(idx),
|
||||
}
|
||||
track_results.append(entry)
|
||||
|
||||
database.update_sync_history_completion(batch_id, tracks_found, completed_count, failed_count)
|
||||
|
||||
if track_results:
|
||||
database.update_sync_history_track_results(batch_id, json.dumps(track_results))
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to record sync history completion: {e}")
|
||||
@ -0,0 +1,379 @@
|
||||
"""Tests for core/downloads/history.py — sync history start/completion + source detection."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
|
||||
import pytest
|
||||
|
||||
from core.downloads import history
|
||||
from core.runtime_state import download_tasks
|
||||
from database.music_database import MusicDatabase
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def db(tmp_path):
|
||||
return MusicDatabase(str(tmp_path / "music.db"))
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def clear_tasks():
|
||||
"""Each test gets a clean download_tasks dict."""
|
||||
download_tasks.clear()
|
||||
yield
|
||||
download_tasks.clear()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# detect_sync_source
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_detect_source_wishlist():
|
||||
assert history.detect_sync_source('wishlist') == 'wishlist'
|
||||
|
||||
|
||||
def test_detect_source_default_spotify():
|
||||
assert history.detect_sync_source('something_unknown') == 'spotify'
|
||||
|
||||
|
||||
def test_detect_source_youtube_prefix():
|
||||
assert history.detect_sync_source('youtube_abc123') == 'youtube'
|
||||
|
||||
|
||||
def test_detect_source_tidal_prefix():
|
||||
assert history.detect_sync_source('tidal_xyz') == 'tidal'
|
||||
|
||||
|
||||
def test_detect_source_deezer_prefix():
|
||||
assert history.detect_sync_source('deezer_xyz') == 'deezer'
|
||||
|
||||
|
||||
def test_detect_source_beatport_prefix():
|
||||
assert history.detect_sync_source('beatport_anything') == 'beatport'
|
||||
|
||||
|
||||
def test_detect_source_listenbrainz_prefix():
|
||||
assert history.detect_sync_source('listenbrainz_mbid') == 'listenbrainz'
|
||||
|
||||
|
||||
def test_detect_source_mirrored_auto_prefix():
|
||||
assert history.detect_sync_source('auto_mirror_pl1') == 'mirrored'
|
||||
|
||||
|
||||
def test_detect_source_youtube_mirrored_takes_precedence_over_youtube():
|
||||
"""Both prefixes match — youtube_mirrored_ must win."""
|
||||
assert history.detect_sync_source('youtube_mirrored_pl1') == 'mirrored'
|
||||
|
||||
|
||||
def test_detect_source_discover_album():
|
||||
assert history.detect_sync_source('discover_album_x') == 'discover'
|
||||
|
||||
|
||||
def test_detect_source_seasonal_album():
|
||||
assert history.detect_sync_source('seasonal_album_x') == 'discover'
|
||||
|
||||
|
||||
def test_detect_source_library():
|
||||
assert history.detect_sync_source('library_redownload_id') == 'library'
|
||||
|
||||
|
||||
def test_detect_source_issue_download():
|
||||
assert history.detect_sync_source('issue_download_id') == 'library'
|
||||
|
||||
|
||||
def test_detect_source_artist_album():
|
||||
assert history.detect_sync_source('artist_album_xyz') == 'spotify'
|
||||
|
||||
|
||||
def test_detect_source_enhanced_search():
|
||||
assert history.detect_sync_source('enhanced_search_xyz') == 'spotify'
|
||||
|
||||
|
||||
def test_detect_source_spotify_public():
|
||||
assert history.detect_sync_source('spotify_public_xyz') == 'spotify_public'
|
||||
|
||||
|
||||
def test_detect_source_beatport_release():
|
||||
assert history.detect_sync_source('beatport_release_x') == 'beatport'
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# record_sync_history_start — happy paths
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_start_records_basic_playlist(db):
|
||||
history.record_sync_history_start(
|
||||
db, batch_id='b1', playlist_id='spot_pl', playlist_name='My PL',
|
||||
tracks=[{'name': 't1'}, {'name': 't2'}],
|
||||
is_album_download=False, album_context=None, artist_context=None,
|
||||
playlist_folder_mode=False,
|
||||
)
|
||||
rows = db.get_latest_sync_history_by_playlist('spot_pl')
|
||||
assert rows is not None
|
||||
assert rows['batch_id'] == 'b1'
|
||||
assert rows['playlist_name'] == 'My PL'
|
||||
assert rows['source'] == 'spotify'
|
||||
assert rows['sync_type'] == 'playlist'
|
||||
assert rows['total_tracks'] == 2
|
||||
|
||||
|
||||
def test_start_album_sets_sync_type_album(db):
|
||||
history.record_sync_history_start(
|
||||
db, batch_id='b1', playlist_id='spot_pl', playlist_name='Alb',
|
||||
tracks=[{'name': 't1'}],
|
||||
is_album_download=True, album_context=None, artist_context=None,
|
||||
playlist_folder_mode=False,
|
||||
)
|
||||
row = db.get_latest_sync_history_by_playlist('spot_pl')
|
||||
assert row['sync_type'] == 'album'
|
||||
|
||||
|
||||
def test_start_wishlist_sets_sync_type_wishlist(db):
|
||||
history.record_sync_history_start(
|
||||
db, batch_id='b1', playlist_id='wishlist', playlist_name='Wishlist',
|
||||
tracks=[],
|
||||
is_album_download=False, album_context=None, artist_context=None,
|
||||
playlist_folder_mode=False,
|
||||
)
|
||||
row = db.get_latest_sync_history_by_playlist('wishlist')
|
||||
assert row['sync_type'] == 'wishlist'
|
||||
|
||||
|
||||
def test_start_pulls_thumb_from_album_context_images_list(db):
|
||||
history.record_sync_history_start(
|
||||
db, batch_id='b1', playlist_id='spot_pl', playlist_name='Alb',
|
||||
tracks=[],
|
||||
is_album_download=True,
|
||||
album_context={'images': [{'url': 'http://thumb.jpg'}]},
|
||||
artist_context=None, playlist_folder_mode=False,
|
||||
)
|
||||
row = db.get_latest_sync_history_by_playlist('spot_pl')
|
||||
assert row['thumb_url'] == 'http://thumb.jpg'
|
||||
|
||||
|
||||
def test_start_pulls_thumb_from_album_context_image_url_fallback(db):
|
||||
history.record_sync_history_start(
|
||||
db, batch_id='b1', playlist_id='spot_pl', playlist_name='Alb',
|
||||
tracks=[],
|
||||
is_album_download=True,
|
||||
album_context={'image_url': 'http://x.jpg'},
|
||||
artist_context=None, playlist_folder_mode=False,
|
||||
)
|
||||
row = db.get_latest_sync_history_by_playlist('spot_pl')
|
||||
assert row['thumb_url'] == 'http://x.jpg'
|
||||
|
||||
|
||||
def test_start_pulls_thumb_from_first_track_when_album_context_missing(db):
|
||||
history.record_sync_history_start(
|
||||
db, batch_id='b1', playlist_id='spot_pl', playlist_name='PL',
|
||||
tracks=[{'album': {'images': [{'url': 'http://track.jpg'}]}}],
|
||||
is_album_download=False, album_context=None, artist_context=None,
|
||||
playlist_folder_mode=False,
|
||||
)
|
||||
row = db.get_latest_sync_history_by_playlist('spot_pl')
|
||||
assert row['thumb_url'] == 'http://track.jpg'
|
||||
|
||||
|
||||
def test_start_no_thumb_anywhere_leaves_null(db):
|
||||
history.record_sync_history_start(
|
||||
db, batch_id='b1', playlist_id='spot_pl', playlist_name='PL',
|
||||
tracks=[], is_album_download=False,
|
||||
album_context=None, artist_context=None, playlist_folder_mode=False,
|
||||
)
|
||||
row = db.get_latest_sync_history_by_playlist('spot_pl')
|
||||
assert row['thumb_url'] is None
|
||||
|
||||
|
||||
def test_start_updates_existing_entry_for_same_playlist_id(db):
|
||||
history.record_sync_history_start(
|
||||
db, batch_id='b1', playlist_id='spot_pl', playlist_name='Original',
|
||||
tracks=[{'name': 'a'}], is_album_download=False,
|
||||
album_context=None, artist_context=None, playlist_folder_mode=False,
|
||||
)
|
||||
first_row = db.get_latest_sync_history_by_playlist('spot_pl')
|
||||
|
||||
history.record_sync_history_start(
|
||||
db, batch_id='b2', playlist_id='spot_pl', playlist_name='Renamed',
|
||||
tracks=[{'name': 'a'}, {'name': 'b'}, {'name': 'c'}], is_album_download=False,
|
||||
album_context=None, artist_context=None, playlist_folder_mode=False,
|
||||
)
|
||||
second_row = db.get_latest_sync_history_by_playlist('spot_pl')
|
||||
# Same row id (updated, not duplicated)
|
||||
assert second_row['id'] == first_row['id']
|
||||
assert second_row['batch_id'] == 'b2'
|
||||
assert second_row['playlist_name'] == 'Renamed'
|
||||
assert second_row['total_tracks'] == 3
|
||||
|
||||
|
||||
def test_start_swallows_db_error(db, monkeypatch):
|
||||
"""Best-effort: must not raise if DB write fails."""
|
||||
def boom(*a, **kw):
|
||||
raise RuntimeError("db dead")
|
||||
monkeypatch.setattr(db, 'add_sync_history_entry', boom)
|
||||
# Must not raise
|
||||
history.record_sync_history_start(
|
||||
db, batch_id='b1', playlist_id='new_pl', playlist_name='X',
|
||||
tracks=[], is_album_download=False,
|
||||
album_context=None, artist_context=None, playlist_folder_mode=False,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# record_sync_history_completion
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _seed_start(db, batch_id='b1', playlist_id='spot_pl'):
|
||||
history.record_sync_history_start(
|
||||
db, batch_id=batch_id, playlist_id=playlist_id, playlist_name='PL',
|
||||
tracks=[], is_album_download=False,
|
||||
album_context=None, artist_context=None, playlist_folder_mode=False,
|
||||
)
|
||||
|
||||
|
||||
def test_completion_writes_counts(db):
|
||||
_seed_start(db)
|
||||
download_tasks['t1'] = {'track_index': 0, 'status': 'completed'}
|
||||
download_tasks['t2'] = {'track_index': 1, 'status': 'failed'}
|
||||
batch = {
|
||||
'queue': ['t1', 't2'],
|
||||
'analysis_results': [
|
||||
{'track_index': 0, 'found': True, 'confidence': 0.95, 'track': {'name': 'A'}},
|
||||
{'track_index': 1, 'found': False, 'confidence': 0.0, 'track': {'name': 'B'}},
|
||||
],
|
||||
'permanently_failed_tracks': ['t2'],
|
||||
}
|
||||
history.record_sync_history_completion(db, 'b1', batch)
|
||||
|
||||
row = db.get_latest_sync_history_by_playlist('spot_pl')
|
||||
assert row['tracks_found'] == 1
|
||||
assert row['tracks_downloaded'] == 1
|
||||
assert row['tracks_failed'] == 1
|
||||
|
||||
|
||||
def test_completion_per_track_results_json(db):
|
||||
_seed_start(db)
|
||||
download_tasks['t1'] = {'track_index': 0, 'status': 'completed'}
|
||||
batch = {
|
||||
'queue': ['t1'],
|
||||
'analysis_results': [{
|
||||
'track_index': 0,
|
||||
'found': True,
|
||||
'confidence': 0.876543,
|
||||
'track': {
|
||||
'name': 'Money',
|
||||
'artists': [{'name': 'Pink Floyd'}],
|
||||
'album': {'name': 'DSOTM', 'images': [{'url': 'http://thumb.jpg'}]},
|
||||
'duration_ms': 383000,
|
||||
'id': 'spotify:track:xyz',
|
||||
},
|
||||
}],
|
||||
'permanently_failed_tracks': [],
|
||||
}
|
||||
history.record_sync_history_completion(db, 'b1', batch)
|
||||
|
||||
row = db.get_latest_sync_history_by_playlist('spot_pl')
|
||||
track_results = json.loads(row['track_results'])
|
||||
assert len(track_results) == 1
|
||||
entry = track_results[0]
|
||||
assert entry['index'] == 0
|
||||
assert entry['name'] == 'Money'
|
||||
assert entry['artist'] == 'Pink Floyd'
|
||||
assert entry['album'] == 'DSOTM'
|
||||
assert entry['image_url'] == 'http://thumb.jpg'
|
||||
assert entry['duration_ms'] == 383000
|
||||
assert entry['source_track_id'] == 'spotify:track:xyz'
|
||||
assert entry['status'] == 'found'
|
||||
assert entry['confidence'] == 0.877 # rounded to 3
|
||||
assert entry['matched_track'] is None
|
||||
assert entry['download_status'] == 'completed'
|
||||
|
||||
|
||||
def test_completion_artist_string_form_normalized(db):
|
||||
_seed_start(db)
|
||||
download_tasks['t1'] = {'track_index': 0, 'status': 'completed'}
|
||||
batch = {
|
||||
'queue': ['t1'],
|
||||
'analysis_results': [{
|
||||
'track_index': 0, 'found': True, 'confidence': 1.0,
|
||||
'track': {'name': 'X', 'artists': ['Plain String Artist'], 'album': 'StringAlbum'},
|
||||
}],
|
||||
'permanently_failed_tracks': [],
|
||||
}
|
||||
history.record_sync_history_completion(db, 'b1', batch)
|
||||
row = db.get_latest_sync_history_by_playlist('spot_pl')
|
||||
entry = json.loads(row['track_results'])[0]
|
||||
assert entry['artist'] == 'Plain String Artist'
|
||||
assert entry['album'] == 'StringAlbum'
|
||||
|
||||
|
||||
def test_completion_no_artists_returns_empty_string(db):
|
||||
_seed_start(db)
|
||||
download_tasks['t1'] = {'track_index': 0, 'status': 'completed'}
|
||||
batch = {
|
||||
'queue': ['t1'],
|
||||
'analysis_results': [{
|
||||
'track_index': 0, 'found': True, 'confidence': 1.0,
|
||||
'track': {'name': 'X', 'artists': []},
|
||||
}],
|
||||
'permanently_failed_tracks': [],
|
||||
}
|
||||
history.record_sync_history_completion(db, 'b1', batch)
|
||||
row = db.get_latest_sync_history_by_playlist('spot_pl')
|
||||
entry = json.loads(row['track_results'])[0]
|
||||
assert entry['artist'] == ''
|
||||
|
||||
|
||||
def test_completion_unmatched_tracks_marked_not_found(db):
|
||||
_seed_start(db)
|
||||
batch = {
|
||||
'queue': [],
|
||||
'analysis_results': [{
|
||||
'track_index': 0, 'found': False, 'confidence': 0.0,
|
||||
'track': {'name': 'X'},
|
||||
}],
|
||||
'permanently_failed_tracks': [],
|
||||
}
|
||||
history.record_sync_history_completion(db, 'b1', batch)
|
||||
row = db.get_latest_sync_history_by_playlist('spot_pl')
|
||||
entry = json.loads(row['track_results'])[0]
|
||||
assert entry['status'] == 'not_found'
|
||||
|
||||
|
||||
def test_completion_swallows_db_error(db, monkeypatch):
|
||||
_seed_start(db)
|
||||
def boom(*a, **kw):
|
||||
raise RuntimeError("db dead")
|
||||
monkeypatch.setattr(db, 'update_sync_history_completion', boom)
|
||||
# Must not raise
|
||||
history.record_sync_history_completion(db, 'b1', {
|
||||
'queue': [], 'analysis_results': [], 'permanently_failed_tracks': [],
|
||||
})
|
||||
|
||||
|
||||
def test_completion_no_track_results_skips_track_results_write(db, monkeypatch):
|
||||
_seed_start(db)
|
||||
calls = []
|
||||
monkeypatch.setattr(db, 'update_sync_history_track_results',
|
||||
lambda *a, **kw: calls.append((a, kw)))
|
||||
history.record_sync_history_completion(db, 'b1', {
|
||||
'queue': [], 'analysis_results': [], 'permanently_failed_tracks': [],
|
||||
})
|
||||
assert calls == []
|
||||
|
||||
|
||||
def test_completion_download_status_map_falls_through_to_unknown(db):
|
||||
_seed_start(db)
|
||||
# Task exists in queue but no status field
|
||||
download_tasks['t1'] = {'track_index': 0}
|
||||
batch = {
|
||||
'queue': ['t1'],
|
||||
'analysis_results': [{
|
||||
'track_index': 0, 'found': True, 'confidence': 1.0,
|
||||
'track': {'name': 'X'},
|
||||
}],
|
||||
'permanently_failed_tracks': [],
|
||||
}
|
||||
history.record_sync_history_completion(db, 'b1', batch)
|
||||
row = db.get_latest_sync_history_by_playlist('spot_pl')
|
||||
entry = json.loads(row['track_results'])[0]
|
||||
assert entry['download_status'] == 'unknown'
|
||||
Loading…
Reference in new issue