Player revamp Phase 0a: extract radio selection into testable core/radio/

First step of the stream/player/radio revamp (see revamp_plan.md). The radio
algorithm lived inline inside database.music_database.get_radio_tracks as raw
SQL tangled with selection logic — untestable without a live DB (which also
throws in the dev sandbox). Lifted the pure DECISIONS into core/radio/selection.py:

  - parse_tags / merge_tags  — JSON-or-CSV tag fields → ordered deduped list
  - same_artist_cap          — tier-1 30%-floored-at-5 cap
  - build_like_conditions    — OR-of-LIKEs SQL fragment + params per tier
  - RadioCollector           — dedup + cap + exclude-set + NOT-IN placeholder/value tracking

The DB method keeps the cursor work and now delegates every decision to these
helpers. Faithful extraction, not a rewrite — behavior unchanged.

This is the kettui foundation move: radio is now unit-testable, so Phase 2
(smart ranking — play-count / recency / feature seeding) becomes 'evolve a
tested function' instead of 'rewrite SQL and pray'.

Tests (tests/radio/):
  - test_selection.py (22): unit coverage of every extracted helper
  - test_get_radio_tracks_db.py (7): drive the REAL get_radio_tracks against
    in-memory sqlite — tier fallback, dedup, exclude, file_path filter.
    Behavior-pinned: these 7 pass against BOTH old inline and new extracted
    code (refactor-equivalence proof). 52 adjacent DB+radio tests green.
pull/761/head
BoulderBadgeDad 3 weeks ago
parent 472ec7ea01
commit cbc001e283

@ -0,0 +1,22 @@
"""Radio / auto-play recommendation logic.
Pure, DB-agnostic helpers that decide *what* radio should play. The SQL
execution stays in ``database.music_database.get_radio_tracks``; this package
owns the decisions (tag parsing, tier caps, dedup/collection, LIKE-condition
building) so they're unit-testable without a live DB — the seam Phase 2's
smarter ranking will plug into.
"""
from core.radio.selection import (
RadioCollector,
build_like_conditions,
parse_tags,
same_artist_cap,
)
__all__ = [
"RadioCollector",
"build_like_conditions",
"parse_tags",
"same_artist_cap",
]

@ -0,0 +1,139 @@
"""Pure radio-selection decisions, lifted out of the DB layer.
``database.music_database.get_radio_tracks`` used to inline all of this between
``cursor.execute`` calls, so the algorithm couldn't be tested without a live DB
(which also happens to throw in the dev sandbox). These helpers carry the same
behavior as before they're a faithful extraction, not a rewrite — but as
plain functions they're unit-testable and give Phase 2 (smart ranking) a clean
place to evolve the logic.
Nothing here touches sqlite; callers pass already-fetched rows (as dicts) and
get back decisions.
"""
from __future__ import annotations
import json
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple
def parse_tags(raw_val: Any) -> List[str]:
"""Parse a genre/mood/style field into a list of tags.
The field may be a JSON array (canonical) or a legacy comma-separated
string. Mirrors the inline ``_parse_tags`` the DB method used.
"""
if not raw_val:
return []
try:
parsed = json.loads(raw_val)
return parsed if isinstance(parsed, list) else [str(parsed)]
except (json.JSONDecodeError, ValueError, TypeError):
return [t.strip() for t in str(raw_val).split(",") if t.strip()]
def same_artist_cap(limit: int) -> int:
"""How many same-artist tracks tier 1 may contribute.
Capped so radio doesn't become an all-one-artist playlist: 30% of the
limit, floored at 5 (matches the original ``max(5, limit * 3 // 10)``).
"""
return max(5, limit * 3 // 10)
def merge_tags(*tag_groups: Iterable[str]) -> List[str]:
"""Concatenate tag lists, dedupe, preserve first-seen order.
Mirrors ``list(dict.fromkeys(a + b))`` used for genre/mood/style merges.
"""
merged: List[str] = []
for group in tag_groups:
for tag in group:
merged.append(tag)
return list(dict.fromkeys(merged))
def build_like_conditions(
tags: Sequence[str], columns: Sequence[str]
) -> Tuple[str, List[str]]:
"""Build an OR-of-LIKEs SQL fragment + params for matching ``tags``
against each of ``columns``.
Returns ``(sql_fragment, params)`` where the fragment is
``"col1 LIKE ? OR col1 LIKE ? OR col2 LIKE ? ..."`` (one LIKE per
column per tag) and params are the ``%tag%`` wildcards in matching
order. Returns ``("", [])`` when there are no tags or no columns, so
callers can skip the tier cleanly.
This reproduces the original per-tier condition building, which paired
every tag against album-level and artist-level columns.
"""
if not tags or not columns:
return "", []
conditions: List[str] = []
params: List[str] = []
# Group by column (all tags for column A, then all tags for column B) to
# match the original ordering: it emitted every ``al.<f> LIKE ?`` then
# every ``ar.<f> LIKE ?``, with params being ``[%tag%...] * 2``.
for col in columns:
for tag in tags:
conditions.append(f"{col} LIKE ?")
params.append(f"%{tag}%")
return " OR ".join(conditions), params
class RadioCollector:
"""Accumulates radio candidates across tiers with dedup + cap logic.
Replaces the inline ``collected`` list + ``seen_ids`` set + ``_collect``
closure the DB method used. Construct with the overall ``limit`` and the
set of IDs to exclude up front (seed track + caller-supplied), then feed
each tier's fetched rows through :meth:`collect`.
"""
def __init__(self, limit: int, exclude_ids: Optional[Iterable[Any]] = None):
self.limit = limit
self._collected: List[Dict[str, Any]] = []
# seen_ids seeds with the exclude set so excluded tracks never collect
# AND so the placeholders/values used in WHERE ... NOT IN stay in sync.
self._seen: set[str] = {str(e) for e in (exclude_ids or [])}
@property
def tracks(self) -> List[Dict[str, Any]]:
return self._collected
@property
def filled(self) -> bool:
"""True once we've reached the overall limit."""
return len(self._collected) >= self.limit
def exclude_placeholders(self) -> str:
"""SQL ``?,?,...`` placeholder string sized to the current seen set."""
return ",".join("?" * len(self._seen))
def exclude_values(self) -> List[str]:
"""Param values for the placeholders above (current seen set)."""
return list(self._seen)
def remaining(self) -> int:
"""How many more tracks are needed to hit the limit."""
return max(0, self.limit - len(self._collected))
def collect(self, rows: Iterable[Dict[str, Any]], cap: Optional[int] = None) -> bool:
"""Append ``rows`` (dict-like) to the result, skipping already-seen IDs.
``cap`` bounds how many THIS call may add (on top of what's already
collected); ``None`` means bounded only by the overall limit. Returns
True once the overall limit is reached. Mirrors the original
``_collect`` closure exactly.
"""
target = min(self.limit, len(self._collected) + cap) if cap else self.limit
for row in rows:
r = dict(row)
rid = str(r["id"])
if rid not in self._seen:
self._seen.add(rid)
self._collected.append(r)
if len(self._collected) >= target:
return True
return self.filled

@ -12796,19 +12796,24 @@ class MusicDatabase:
seed = dict(seed)
artist_name = seed['artist_name']
# Build the set of IDs to exclude (seed + caller-supplied)
excluded = {str(track_id)}
if exclude_ids:
excluded.update(str(eid) for eid in exclude_ids)
collected: list[dict] = []
seen_ids: set[str] = set(excluded)
def _exclude_placeholders():
return ','.join('?' * len(seen_ids))
# Selection decisions (dedup, caps, tag parsing, condition
# building) live in core.radio.selection so they're unit-
# testable without a live DB. The cursor work stays here.
from core.radio.selection import (
RadioCollector,
build_like_conditions,
merge_tags,
parse_tags,
same_artist_cap,
)
def _exclude_values():
return list(seen_ids)
# Seed + caller-supplied IDs to exclude (seeds the collector's
# seen-set so excluded tracks never collect and the NOT IN
# placeholders/values stay in sync).
exclude_seed = [str(track_id)]
if exclude_ids:
exclude_seed.extend(str(eid) for eid in exclude_ids)
collector = RadioCollector(limit, exclude_ids=exclude_seed)
_track_select = """
SELECT t.id, t.title, t.track_number, t.duration,
@ -12824,98 +12829,71 @@ class MusicDatabase:
# Only return tracks that have actual files on disk
_file_filter = "t.file_path IS NOT NULL AND t.file_path != ''"
def _collect(rows, cap=None):
"""Append rows to collected. Stop at cap or limit."""
target = min(limit, (len(collected) + cap)) if cap else limit
for row in rows:
r = dict(row)
rid = str(r['id'])
if rid not in seen_ids:
seen_ids.add(rid)
collected.append(r)
if len(collected) >= target:
return True
return len(collected) >= limit
def _parse_tags(raw_val):
"""Parse a JSON array or comma-separated string into a list."""
if not raw_val:
return []
try:
parsed = json.loads(raw_val)
return parsed if isinstance(parsed, list) else [str(parsed)]
except (json.JSONDecodeError, ValueError):
return [t.strip() for t in raw_val.split(',') if t.strip()]
# --- 1. Same artist, different albums (capped at 30% of limit) ---
same_artist_cap = max(5, limit * 3 // 10)
artist_cap = same_artist_cap(limit)
cursor.execute(f"""
{_track_select}
WHERE {_file_filter} AND ar.name = ? AND t.album_id != ? AND t.id NOT IN ({_exclude_placeholders()})
WHERE {_file_filter} AND ar.name = ? AND t.album_id != ? AND t.id NOT IN ({collector.exclude_placeholders()})
ORDER BY RANDOM()
LIMIT ?
""", [artist_name, seed['album_id']] + _exclude_values() + [same_artist_cap])
_collect(cursor.fetchall(), cap=same_artist_cap)
""", [artist_name, seed['album_id']] + collector.exclude_values() + [artist_cap])
collector.collect(cursor.fetchall(), cap=artist_cap)
if len(collected) >= limit:
return {'success': True, 'tracks': collected}
if collector.filled:
return {'success': True, 'tracks': collector.tracks}
# --- 2. Same genre (album genres + artist genres, other artists) ---
genre_list = _parse_tags(seed.get('album_genres'))
artist_genre_list = _parse_tags(seed.get('artist_genres'))
all_genres = list(dict.fromkeys(genre_list + artist_genre_list)) # dedupe, preserve order
if all_genres:
genre_conditions = ' OR '.join(
['al.genres LIKE ?' for _ in all_genres] +
['ar.genres LIKE ?' for _ in all_genres]
)
genre_params = [f'%{g}%' for g in all_genres] * 2
all_genres = merge_tags(
parse_tags(seed.get('album_genres')),
parse_tags(seed.get('artist_genres')),
)
genre_conditions, genre_params = build_like_conditions(
all_genres, ('al.genres', 'ar.genres')
)
if genre_conditions:
cursor.execute(f"""
{_track_select}
WHERE {_file_filter} AND ({genre_conditions})
AND ar.name != ?
AND t.id NOT IN ({_exclude_placeholders()})
AND t.id NOT IN ({collector.exclude_placeholders()})
ORDER BY RANDOM()
LIMIT ?
""", genre_params + [artist_name] + _exclude_values() + [limit - len(collected)])
if _collect(cursor.fetchall()):
return {'success': True, 'tracks': collected}
""", genre_params + [artist_name] + collector.exclude_values() + [collector.remaining()])
if collector.collect(cursor.fetchall()):
return {'success': True, 'tracks': collector.tracks}
# --- 3. Same mood / style (album + artist level) ---
for field_name in ('mood', 'style'):
album_tags = _parse_tags(seed.get(f'album_{field_name}'))
artist_tags = _parse_tags(seed.get(f'artist_{field_name}'))
all_tags = list(dict.fromkeys(album_tags + artist_tags))
if all_tags:
tag_conditions = ' OR '.join(
[f'al.{field_name} LIKE ?' for _ in all_tags] +
[f'ar.{field_name} LIKE ?' for _ in all_tags]
)
tag_params = [f'%{t}%' for t in all_tags] * 2
all_tags = merge_tags(
parse_tags(seed.get(f'album_{field_name}')),
parse_tags(seed.get(f'artist_{field_name}')),
)
tag_conditions, tag_params = build_like_conditions(
all_tags, (f'al.{field_name}', f'ar.{field_name}')
)
if tag_conditions:
cursor.execute(f"""
{_track_select}
WHERE {_file_filter} AND ({tag_conditions})
AND ar.name != ?
AND t.id NOT IN ({_exclude_placeholders()})
AND t.id NOT IN ({collector.exclude_placeholders()})
ORDER BY RANDOM()
LIMIT ?
""", tag_params + [artist_name] + _exclude_values() + [limit - len(collected)])
if _collect(cursor.fetchall()):
return {'success': True, 'tracks': collected}
""", tag_params + [artist_name] + collector.exclude_values() + [collector.remaining()])
if collector.collect(cursor.fetchall()):
return {'success': True, 'tracks': collector.tracks}
# --- 4. Random library tracks ---
if len(collected) < limit:
if not collector.filled:
cursor.execute(f"""
{_track_select}
WHERE {_file_filter} AND t.id NOT IN ({_exclude_placeholders()})
WHERE {_file_filter} AND t.id NOT IN ({collector.exclude_placeholders()})
ORDER BY RANDOM()
LIMIT ?
""", _exclude_values() + [limit - len(collected)])
_collect(cursor.fetchall())
""", collector.exclude_values() + [collector.remaining()])
collector.collect(cursor.fetchall())
return {'success': True, 'tracks': collected}
return {'success': True, 'tracks': collector.tracks}
except Exception as e:
logger.error(f"Error getting radio tracks for track {track_id}: {e}")

@ -0,0 +1,37 @@
# Stream / Player / Radio Revamp — Plan
Goal: bring the audio stream + media-player + radio system to Spotify/Apple-level polish and feature set. Target stack: **plain JS** (`webui/static/media-player.js`), not the React migration. Intended architecture direction: **multi-listener** (final call deferred to Phase 3; Phases 02 stay compatible either way).
Rule for every phase: kettui standard — importable/testable logic, seam-level + differential tests, break nothing, ship one reviewable phase at a time.
---
## Phase 0 — Make it provable (foundation, no user-visible change)
- [ ] **0a. Extract radio selection logic into testable `core/radio/`.** The algorithm (tier orchestration, cap math, dedup, tag parsing, SQL-condition building) is currently tangled with `cursor.execute` inside `database/music_database.py:get_radio_tracks` (~12756) — untestable without a live DB. Pull the pure decisions into `core/radio/selection.py`; the DB method keeps SQL execution but delegates the decisions. Differential-test: same inputs → same output as today.
- [ ] **0b. Centralize frontend player state.** ~10 scattered `np*` globals in `media-player.js` → one `PlayerState` object. Seam for every later frontend phase. No behavior change.
## Phase 1 — Polish / feel (frontend)
- [ ] Persistent queue across refresh (localStorage first; server-side in P3)
- [ ] Drag-to-reorder queue; duration + art per queue item
- [ ] Seek tooltip (hover timestamp); smoother progress
- [ ] Crossfade via dual-`<audio>` swap (honest approximation of gapless — true gapless impossible w/ single element)
- [ ] Full Media Session API (lockscreen / hardware transport keys)
- [ ] Keyboard shortcut overlay + fuller bindings
## Phase 2 — Smart radio (backend algorithm)
- [ ] Replace `ORDER BY RANDOM()` with real seeding: play-count + recency weighting, genre-adjacency, recently-played memory. Slots into the Phase-0a pure module → fully unit-testable (seed → expected ordering). Both radio buttons benefit (shared function).
## Phase 3 — Architecture (deepest, riskiest — listener decision lands here)
- [ ] Per-session (or multi-tenant) stream state — replaces the single global `stream_state` + 1-worker executor + single `Stream/` staging file (`web_server.py:747`).
- [ ] Server-side persistent queue (resume across devices/refresh).
- [ ] Final multi-listener vs single-listener scope decided here, with real usage in hand.
---
## Order of execution
0a (radio extraction) → 2 (smart radio) first: highest *visible* upgrade, backend-only, cleanest to prove, zero playback risk. Then 0b → 1 (polish). Then 3 (architecture) last.

@ -0,0 +1,222 @@
"""End-to-end behavioral pin for MusicDatabase.get_radio_tracks.
Phase 0a extracted the radio SELECTION logic into core.radio.selection but the
DB method still owns the SQL. These tests drive the REAL get_radio_tracks
against an in-memory sqlite to prove the refactor preserved behavior the
4-tier fallback (same-artist cap genre mood/style random), dedup, and
exclude handling all still work through the extracted helpers.
Reuses the in-memory MusicDatabase harness pattern from
tests/test_reorganize_db_methods.py.
"""
import sqlite3
import sys
import types
import pytest
# ── stubs (same shape used elsewhere in the suite) ────────────────────────
if "spotipy" not in sys.modules:
spotipy = types.ModuleType("spotipy")
spotipy.Spotify = object
oauth2 = types.ModuleType("spotipy.oauth2")
oauth2.SpotifyOAuth = object
oauth2.SpotifyClientCredentials = object
spotipy.oauth2 = oauth2
sys.modules["spotipy"] = spotipy
sys.modules["spotipy.oauth2"] = oauth2
if "config.settings" not in sys.modules:
config_pkg = types.ModuleType("config")
settings_mod = types.ModuleType("config.settings")
class _DummyConfigManager:
def get(self, key, default=None):
return default
def get_active_media_server(self):
return "primary"
settings_mod.config_manager = _DummyConfigManager()
config_pkg.settings = settings_mod
sys.modules["config"] = config_pkg
sys.modules["config.settings"] = settings_mod
from database.music_database import MusicDatabase # noqa: E402
class _InMemoryDB(MusicDatabase):
def __init__(self):
self._conn = sqlite3.connect(":memory:")
self._conn.row_factory = sqlite3.Row
def _get_connection(self):
return _NonClosingConn(self._conn)
class _NonClosingConn:
def __init__(self, real):
self._real = real
def cursor(self):
return self._real.cursor()
def commit(self):
return self._real.commit()
def close(self):
pass
def __enter__(self):
return self
def __exit__(self, *args):
pass
def _schema(db):
cur = db._conn.cursor()
cur.execute("""
CREATE TABLE artists (
id TEXT PRIMARY KEY, name TEXT,
genres TEXT, mood TEXT, style TEXT, thumb_url TEXT
)
""")
cur.execute("""
CREATE TABLE albums (
id TEXT PRIMARY KEY, artist_id TEXT, title TEXT,
genres TEXT, mood TEXT, style TEXT, thumb_url TEXT
)
""")
cur.execute("""
CREATE TABLE tracks (
id TEXT PRIMARY KEY, album_id TEXT, artist_id TEXT,
title TEXT, track_number INTEGER, duration INTEGER,
file_path TEXT, bitrate INTEGER
)
""")
db._conn.commit()
def _add_artist(db, aid, name, genres="", mood="", style=""):
db._conn.execute(
"INSERT INTO artists (id, name, genres, mood, style, thumb_url) VALUES (?,?,?,?,?,?)",
(aid, name, genres, mood, style, ""),
)
def _add_album(db, alid, aid, title, genres="", mood="", style=""):
db._conn.execute(
"INSERT INTO albums (id, artist_id, title, genres, mood, style, thumb_url) VALUES (?,?,?,?,?,?,?)",
(alid, aid, title, genres, mood, style, ""),
)
def _add_track(db, tid, alid, aid, title, file_path="/m/x.flac"):
db._conn.execute(
"INSERT INTO tracks (id, album_id, artist_id, title, track_number, duration, file_path, bitrate) "
"VALUES (?,?,?,?,?,?,?,?)",
(tid, alid, aid, title, 1, 200, file_path, 1000),
)
@pytest.fixture
def db():
d = _InMemoryDB()
_schema(d)
return d
def test_missing_seed_track_returns_failure(db):
res = db.get_radio_tracks("nope", limit=10)
assert res["success"] is False
def test_tier1_same_artist_other_albums(db):
_add_artist(db, "ar1", "Artist One")
_add_album(db, "al1", "ar1", "Album A")
_add_album(db, "al2", "ar1", "Album B")
_add_track(db, "seed", "al1", "ar1", "Seed")
_add_track(db, "t2", "al2", "ar1", "Other Album Track")
db._conn.commit()
res = db.get_radio_tracks("seed", limit=10)
assert res["success"] is True
ids = [t["id"] for t in res["tracks"]]
assert "t2" in ids
assert "seed" not in ids # seed always excluded
def test_excludes_caller_supplied_ids(db):
_add_artist(db, "ar1", "Artist One")
_add_album(db, "al1", "ar1", "Album A")
_add_album(db, "al2", "ar1", "Album B")
_add_track(db, "seed", "al1", "ar1", "Seed")
_add_track(db, "t2", "al2", "ar1", "T2")
_add_track(db, "t3", "al2", "ar1", "T3")
db._conn.commit()
res = db.get_radio_tracks("seed", limit=10, exclude_ids=["t2"])
ids = [t["id"] for t in res["tracks"]]
assert "t2" not in ids
assert "t3" in ids
def test_tier2_genre_match_other_artists(db):
# No same-artist alternatives; falls to genre tier.
_add_artist(db, "ar1", "Seed Artist", genres='["shoegaze"]')
_add_artist(db, "ar2", "Other Artist", genres='["shoegaze"]')
_add_album(db, "al1", "ar1", "Seed Album", genres='["shoegaze"]')
_add_album(db, "al2", "ar2", "Other Album", genres='["shoegaze"]')
_add_track(db, "seed", "al1", "ar1", "Seed")
_add_track(db, "g1", "al2", "ar2", "Genre Match")
db._conn.commit()
res = db.get_radio_tracks("seed", limit=10)
ids = [t["id"] for t in res["tracks"]]
assert "g1" in ids
def test_tier4_random_fallback_fills_when_no_metadata_match(db):
# Seed has no genre/mood/style and no same-artist alts → random tier.
_add_artist(db, "ar1", "Seed Artist")
_add_artist(db, "ar2", "Unrelated")
_add_album(db, "al1", "ar1", "Seed Album")
_add_album(db, "al2", "ar2", "Unrelated Album")
_add_track(db, "seed", "al1", "ar1", "Seed")
_add_track(db, "r1", "al2", "ar2", "Random One")
db._conn.commit()
res = db.get_radio_tracks("seed", limit=10)
ids = [t["id"] for t in res["tracks"]]
assert "r1" in ids # filled from random tier
def test_only_returns_tracks_with_files(db):
_add_artist(db, "ar1", "Artist One")
_add_album(db, "al1", "ar1", "Album A")
_add_album(db, "al2", "ar1", "Album B")
_add_track(db, "seed", "al1", "ar1", "Seed")
_add_track(db, "nofile", "al2", "ar1", "No File", file_path="")
db._conn.commit()
res = db.get_radio_tracks("seed", limit=10)
ids = [t["id"] for t in res["tracks"]]
assert "nofile" not in ids # file_path filter still enforced
def test_no_duplicate_ids_across_tiers(db):
# A track that qualifies for both same-artist AND genre must appear once.
_add_artist(db, "ar1", "Artist One", genres='["pop"]')
_add_album(db, "al1", "ar1", "Album A", genres='["pop"]')
_add_album(db, "al2", "ar1", "Album B", genres='["pop"]')
_add_track(db, "seed", "al1", "ar1", "Seed")
_add_track(db, "dup", "al2", "ar1", "Could Match Twice")
db._conn.commit()
res = db.get_radio_tracks("seed", limit=10)
ids = [t["id"] for t in res["tracks"]]
assert ids.count("dup") == 1

@ -0,0 +1,139 @@
"""Tests for the extracted radio-selection logic (Phase 0a of the player revamp).
These pin the behavior that used to be inline + untestable inside
``database.music_database.get_radio_tracks``. They lock current behavior so
Phase 2 (smart ranking) can evolve it against a green baseline.
"""
from __future__ import annotations
from core.radio.selection import (
RadioCollector,
build_like_conditions,
merge_tags,
parse_tags,
same_artist_cap,
)
class TestParseTags:
def test_json_array(self):
assert parse_tags('["rock", "indie"]') == ["rock", "indie"]
def test_comma_separated_legacy(self):
assert parse_tags("rock, indie, folk") == ["rock", "indie", "folk"]
def test_comma_separated_strips_whitespace_and_blanks(self):
assert parse_tags("rock, , indie ,") == ["rock", "indie"]
def test_empty_and_none(self):
assert parse_tags("") == []
assert parse_tags(None) == []
def test_non_list_json_scalar_wrapped(self):
# A bare JSON scalar (e.g. a quoted string) becomes a single-item list.
assert parse_tags('"rock"') == ["rock"]
def test_garbage_falls_back_to_split(self):
assert parse_tags("not json at all") == ["not json at all"]
class TestSameArtistCap:
def test_thirty_percent(self):
assert same_artist_cap(50) == 15 # 50 * 3 // 10
assert same_artist_cap(20) == 6
def test_floored_at_five(self):
assert same_artist_cap(10) == 5 # 3, floored to 5
assert same_artist_cap(1) == 5
class TestMergeTags:
def test_dedupes_preserving_order(self):
assert merge_tags(["rock", "indie"], ["indie", "folk"]) == ["rock", "indie", "folk"]
def test_empty_groups(self):
assert merge_tags([], []) == []
class TestBuildLikeConditions:
def test_single_tag_two_columns(self):
sql, params = build_like_conditions(["rock"], ("al.genres", "ar.genres"))
assert sql == "al.genres LIKE ? OR ar.genres LIKE ?"
assert params == ["%rock%", "%rock%"]
def test_grouping_matches_original_order(self):
# Original emitted all album-col LIKEs, then all artist-col LIKEs;
# params were [%t%...] * 2. Reproduce that ordering exactly.
sql, params = build_like_conditions(["rock", "indie"], ("al.genres", "ar.genres"))
assert sql == (
"al.genres LIKE ? OR al.genres LIKE ? OR "
"ar.genres LIKE ? OR ar.genres LIKE ?"
)
assert params == ["%rock%", "%indie%", "%rock%", "%indie%"]
def test_no_tags_returns_empty(self):
assert build_like_conditions([], ("al.genres",)) == ("", [])
def test_no_columns_returns_empty(self):
assert build_like_conditions(["rock"], ()) == ("", [])
class TestRadioCollector:
def _rows(self, *ids):
return [{"id": i, "title": f"t{i}"} for i in ids]
def test_collects_and_dedupes(self):
c = RadioCollector(limit=10)
c.collect(self._rows(1, 2, 2, 3)) # dup 2 ignored
assert [t["id"] for t in c.tracks] == [1, 2, 3]
def test_excludes_seed_and_caller_ids(self):
c = RadioCollector(limit=10, exclude_ids=["1", "2"])
c.collect(self._rows(1, 2, 3, 4))
assert [t["id"] for t in c.tracks] == [3, 4]
def test_exclude_ids_coerced_to_str(self):
# Caller may pass ints; seen-set stores strings.
c = RadioCollector(limit=10, exclude_ids=[1])
c.collect(self._rows(1, 2))
assert [t["id"] for t in c.tracks] == [2]
def test_cap_bounds_a_single_tier(self):
c = RadioCollector(limit=10)
c.collect(self._rows(1, 2, 3, 4, 5), cap=2) # only 2 from this tier
assert [t["id"] for t in c.tracks] == [1, 2]
assert not c.filled
assert c.remaining() == 8
def test_filled_at_limit(self):
c = RadioCollector(limit=3)
ret = c.collect(self._rows(1, 2, 3, 4))
assert ret is True
assert c.filled
assert len(c.tracks) == 3
assert c.remaining() == 0
def test_capped_collect_returns_true_at_cap_target(self):
# Faithful to the original _collect: it returns True once the
# cap-bounded target is hit, even below the overall limit. The DB
# method IGNORES tier 1's capped return and checks .filled instead, so
# this never causes early exit — but the contract must match exactly.
c = RadioCollector(limit=5)
assert c.collect(self._rows(1, 2), cap=2) is True # hit cap target (2)
assert not c.filled # but not at limit (5)
def test_uncapped_collect_returns_true_only_at_limit(self):
c = RadioCollector(limit=5)
assert c.collect(self._rows(1, 2)) is False # below limit
assert c.collect(self._rows(3, 4, 5)) is True # now at limit
def test_exclude_placeholders_and_values_track_seen_set(self):
c = RadioCollector(limit=10, exclude_ids=["a", "b"])
assert c.exclude_placeholders() == "?,?"
assert set(c.exclude_values()) == {"a", "b"}
# After collecting, already-collected IDs join the NOT-IN set so the
# next tier's SQL won't re-pull them.
c.collect(self._rows("c"))
assert c.exclude_placeholders() == "?,?,?"
assert set(c.exclude_values()) == {"a", "b", "c"}
Loading…
Cancel
Save