mirror of https://github.com/Nezreka/SoulSync.git
First step of the stream/player/radio revamp (see revamp_plan.md). The radio
algorithm lived inline inside database.music_database.get_radio_tracks as raw
SQL tangled with selection logic — untestable without a live DB (which also
throws in the dev sandbox). Lifted the pure DECISIONS into core/radio/selection.py:
- parse_tags / merge_tags — JSON-or-CSV tag fields → ordered deduped list
- same_artist_cap — tier-1 30%-floored-at-5 cap
- build_like_conditions — OR-of-LIKEs SQL fragment + params per tier
- RadioCollector — dedup + cap + exclude-set + NOT-IN placeholder/value tracking
The DB method keeps the cursor work and now delegates every decision to these
helpers. Faithful extraction, not a rewrite — behavior unchanged.
This is the kettui foundation move: radio is now unit-testable, so Phase 2
(smart ranking — play-count / recency / feature seeding) becomes 'evolve a
tested function' instead of 'rewrite SQL and pray'.
Tests (tests/radio/):
- test_selection.py (22): unit coverage of every extracted helper
- test_get_radio_tracks_db.py (7): drive the REAL get_radio_tracks against
in-memory sqlite — tier fallback, dedup, exclude, file_path filter.
Behavior-pinned: these 7 pass against BOTH old inline and new extracted
code (refactor-equivalence proof). 52 adjacent DB+radio tests green.
pull/761/head
parent
472ec7ea01
commit
cbc001e283
@ -0,0 +1,22 @@
|
||||
"""Radio / auto-play recommendation logic.
|
||||
|
||||
Pure, DB-agnostic helpers that decide *what* radio should play. The SQL
|
||||
execution stays in ``database.music_database.get_radio_tracks``; this package
|
||||
owns the decisions (tag parsing, tier caps, dedup/collection, LIKE-condition
|
||||
building) so they're unit-testable without a live DB — the seam Phase 2's
|
||||
smarter ranking will plug into.
|
||||
"""
|
||||
|
||||
from core.radio.selection import (
|
||||
RadioCollector,
|
||||
build_like_conditions,
|
||||
parse_tags,
|
||||
same_artist_cap,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"RadioCollector",
|
||||
"build_like_conditions",
|
||||
"parse_tags",
|
||||
"same_artist_cap",
|
||||
]
|
||||
@ -0,0 +1,139 @@
|
||||
"""Pure radio-selection decisions, lifted out of the DB layer.
|
||||
|
||||
``database.music_database.get_radio_tracks`` used to inline all of this between
|
||||
``cursor.execute`` calls, so the algorithm couldn't be tested without a live DB
|
||||
(which also happens to throw in the dev sandbox). These helpers carry the same
|
||||
behavior as before — they're a faithful extraction, not a rewrite — but as
|
||||
plain functions they're unit-testable and give Phase 2 (smart ranking) a clean
|
||||
place to evolve the logic.
|
||||
|
||||
Nothing here touches sqlite; callers pass already-fetched rows (as dicts) and
|
||||
get back decisions.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple
|
||||
|
||||
|
||||
def parse_tags(raw_val: Any) -> List[str]:
|
||||
"""Parse a genre/mood/style field into a list of tags.
|
||||
|
||||
The field may be a JSON array (canonical) or a legacy comma-separated
|
||||
string. Mirrors the inline ``_parse_tags`` the DB method used.
|
||||
"""
|
||||
if not raw_val:
|
||||
return []
|
||||
try:
|
||||
parsed = json.loads(raw_val)
|
||||
return parsed if isinstance(parsed, list) else [str(parsed)]
|
||||
except (json.JSONDecodeError, ValueError, TypeError):
|
||||
return [t.strip() for t in str(raw_val).split(",") if t.strip()]
|
||||
|
||||
|
||||
def same_artist_cap(limit: int) -> int:
|
||||
"""How many same-artist tracks tier 1 may contribute.
|
||||
|
||||
Capped so radio doesn't become an all-one-artist playlist: 30% of the
|
||||
limit, floored at 5 (matches the original ``max(5, limit * 3 // 10)``).
|
||||
"""
|
||||
return max(5, limit * 3 // 10)
|
||||
|
||||
|
||||
def merge_tags(*tag_groups: Iterable[str]) -> List[str]:
|
||||
"""Concatenate tag lists, dedupe, preserve first-seen order.
|
||||
|
||||
Mirrors ``list(dict.fromkeys(a + b))`` used for genre/mood/style merges.
|
||||
"""
|
||||
merged: List[str] = []
|
||||
for group in tag_groups:
|
||||
for tag in group:
|
||||
merged.append(tag)
|
||||
return list(dict.fromkeys(merged))
|
||||
|
||||
|
||||
def build_like_conditions(
|
||||
tags: Sequence[str], columns: Sequence[str]
|
||||
) -> Tuple[str, List[str]]:
|
||||
"""Build an OR-of-LIKEs SQL fragment + params for matching ``tags``
|
||||
against each of ``columns``.
|
||||
|
||||
Returns ``(sql_fragment, params)`` where the fragment is
|
||||
``"col1 LIKE ? OR col1 LIKE ? OR col2 LIKE ? ..."`` (one LIKE per
|
||||
column per tag) and params are the ``%tag%`` wildcards in matching
|
||||
order. Returns ``("", [])`` when there are no tags or no columns, so
|
||||
callers can skip the tier cleanly.
|
||||
|
||||
This reproduces the original per-tier condition building, which paired
|
||||
every tag against album-level and artist-level columns.
|
||||
"""
|
||||
if not tags or not columns:
|
||||
return "", []
|
||||
conditions: List[str] = []
|
||||
params: List[str] = []
|
||||
# Group by column (all tags for column A, then all tags for column B) to
|
||||
# match the original ordering: it emitted every ``al.<f> LIKE ?`` then
|
||||
# every ``ar.<f> LIKE ?``, with params being ``[%tag%...] * 2``.
|
||||
for col in columns:
|
||||
for tag in tags:
|
||||
conditions.append(f"{col} LIKE ?")
|
||||
params.append(f"%{tag}%")
|
||||
return " OR ".join(conditions), params
|
||||
|
||||
|
||||
class RadioCollector:
|
||||
"""Accumulates radio candidates across tiers with dedup + cap logic.
|
||||
|
||||
Replaces the inline ``collected`` list + ``seen_ids`` set + ``_collect``
|
||||
closure the DB method used. Construct with the overall ``limit`` and the
|
||||
set of IDs to exclude up front (seed track + caller-supplied), then feed
|
||||
each tier's fetched rows through :meth:`collect`.
|
||||
"""
|
||||
|
||||
def __init__(self, limit: int, exclude_ids: Optional[Iterable[Any]] = None):
|
||||
self.limit = limit
|
||||
self._collected: List[Dict[str, Any]] = []
|
||||
# seen_ids seeds with the exclude set so excluded tracks never collect
|
||||
# AND so the placeholders/values used in WHERE ... NOT IN stay in sync.
|
||||
self._seen: set[str] = {str(e) for e in (exclude_ids or [])}
|
||||
|
||||
@property
|
||||
def tracks(self) -> List[Dict[str, Any]]:
|
||||
return self._collected
|
||||
|
||||
@property
|
||||
def filled(self) -> bool:
|
||||
"""True once we've reached the overall limit."""
|
||||
return len(self._collected) >= self.limit
|
||||
|
||||
def exclude_placeholders(self) -> str:
|
||||
"""SQL ``?,?,...`` placeholder string sized to the current seen set."""
|
||||
return ",".join("?" * len(self._seen))
|
||||
|
||||
def exclude_values(self) -> List[str]:
|
||||
"""Param values for the placeholders above (current seen set)."""
|
||||
return list(self._seen)
|
||||
|
||||
def remaining(self) -> int:
|
||||
"""How many more tracks are needed to hit the limit."""
|
||||
return max(0, self.limit - len(self._collected))
|
||||
|
||||
def collect(self, rows: Iterable[Dict[str, Any]], cap: Optional[int] = None) -> bool:
|
||||
"""Append ``rows`` (dict-like) to the result, skipping already-seen IDs.
|
||||
|
||||
``cap`` bounds how many THIS call may add (on top of what's already
|
||||
collected); ``None`` means bounded only by the overall limit. Returns
|
||||
True once the overall limit is reached. Mirrors the original
|
||||
``_collect`` closure exactly.
|
||||
"""
|
||||
target = min(self.limit, len(self._collected) + cap) if cap else self.limit
|
||||
for row in rows:
|
||||
r = dict(row)
|
||||
rid = str(r["id"])
|
||||
if rid not in self._seen:
|
||||
self._seen.add(rid)
|
||||
self._collected.append(r)
|
||||
if len(self._collected) >= target:
|
||||
return True
|
||||
return self.filled
|
||||
@ -0,0 +1,222 @@
|
||||
"""End-to-end behavioral pin for MusicDatabase.get_radio_tracks.
|
||||
|
||||
Phase 0a extracted the radio SELECTION logic into core.radio.selection but the
|
||||
DB method still owns the SQL. These tests drive the REAL get_radio_tracks
|
||||
against an in-memory sqlite to prove the refactor preserved behavior — the
|
||||
4-tier fallback (same-artist cap → genre → mood/style → random), dedup, and
|
||||
exclude handling all still work through the extracted helpers.
|
||||
|
||||
Reuses the in-memory MusicDatabase harness pattern from
|
||||
tests/test_reorganize_db_methods.py.
|
||||
"""
|
||||
|
||||
import sqlite3
|
||||
import sys
|
||||
import types
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
# ── stubs (same shape used elsewhere in the suite) ────────────────────────
|
||||
if "spotipy" not in sys.modules:
|
||||
spotipy = types.ModuleType("spotipy")
|
||||
spotipy.Spotify = object
|
||||
oauth2 = types.ModuleType("spotipy.oauth2")
|
||||
oauth2.SpotifyOAuth = object
|
||||
oauth2.SpotifyClientCredentials = object
|
||||
spotipy.oauth2 = oauth2
|
||||
sys.modules["spotipy"] = spotipy
|
||||
sys.modules["spotipy.oauth2"] = oauth2
|
||||
|
||||
if "config.settings" not in sys.modules:
|
||||
config_pkg = types.ModuleType("config")
|
||||
settings_mod = types.ModuleType("config.settings")
|
||||
|
||||
class _DummyConfigManager:
|
||||
def get(self, key, default=None):
|
||||
return default
|
||||
|
||||
def get_active_media_server(self):
|
||||
return "primary"
|
||||
|
||||
settings_mod.config_manager = _DummyConfigManager()
|
||||
config_pkg.settings = settings_mod
|
||||
sys.modules["config"] = config_pkg
|
||||
sys.modules["config.settings"] = settings_mod
|
||||
|
||||
|
||||
from database.music_database import MusicDatabase # noqa: E402
|
||||
|
||||
|
||||
class _InMemoryDB(MusicDatabase):
|
||||
def __init__(self):
|
||||
self._conn = sqlite3.connect(":memory:")
|
||||
self._conn.row_factory = sqlite3.Row
|
||||
|
||||
def _get_connection(self):
|
||||
return _NonClosingConn(self._conn)
|
||||
|
||||
|
||||
class _NonClosingConn:
|
||||
def __init__(self, real):
|
||||
self._real = real
|
||||
|
||||
def cursor(self):
|
||||
return self._real.cursor()
|
||||
|
||||
def commit(self):
|
||||
return self._real.commit()
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, *args):
|
||||
pass
|
||||
|
||||
|
||||
def _schema(db):
|
||||
cur = db._conn.cursor()
|
||||
cur.execute("""
|
||||
CREATE TABLE artists (
|
||||
id TEXT PRIMARY KEY, name TEXT,
|
||||
genres TEXT, mood TEXT, style TEXT, thumb_url TEXT
|
||||
)
|
||||
""")
|
||||
cur.execute("""
|
||||
CREATE TABLE albums (
|
||||
id TEXT PRIMARY KEY, artist_id TEXT, title TEXT,
|
||||
genres TEXT, mood TEXT, style TEXT, thumb_url TEXT
|
||||
)
|
||||
""")
|
||||
cur.execute("""
|
||||
CREATE TABLE tracks (
|
||||
id TEXT PRIMARY KEY, album_id TEXT, artist_id TEXT,
|
||||
title TEXT, track_number INTEGER, duration INTEGER,
|
||||
file_path TEXT, bitrate INTEGER
|
||||
)
|
||||
""")
|
||||
db._conn.commit()
|
||||
|
||||
|
||||
def _add_artist(db, aid, name, genres="", mood="", style=""):
|
||||
db._conn.execute(
|
||||
"INSERT INTO artists (id, name, genres, mood, style, thumb_url) VALUES (?,?,?,?,?,?)",
|
||||
(aid, name, genres, mood, style, ""),
|
||||
)
|
||||
|
||||
|
||||
def _add_album(db, alid, aid, title, genres="", mood="", style=""):
|
||||
db._conn.execute(
|
||||
"INSERT INTO albums (id, artist_id, title, genres, mood, style, thumb_url) VALUES (?,?,?,?,?,?,?)",
|
||||
(alid, aid, title, genres, mood, style, ""),
|
||||
)
|
||||
|
||||
|
||||
def _add_track(db, tid, alid, aid, title, file_path="/m/x.flac"):
|
||||
db._conn.execute(
|
||||
"INSERT INTO tracks (id, album_id, artist_id, title, track_number, duration, file_path, bitrate) "
|
||||
"VALUES (?,?,?,?,?,?,?,?)",
|
||||
(tid, alid, aid, title, 1, 200, file_path, 1000),
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def db():
|
||||
d = _InMemoryDB()
|
||||
_schema(d)
|
||||
return d
|
||||
|
||||
|
||||
def test_missing_seed_track_returns_failure(db):
|
||||
res = db.get_radio_tracks("nope", limit=10)
|
||||
assert res["success"] is False
|
||||
|
||||
|
||||
def test_tier1_same_artist_other_albums(db):
|
||||
_add_artist(db, "ar1", "Artist One")
|
||||
_add_album(db, "al1", "ar1", "Album A")
|
||||
_add_album(db, "al2", "ar1", "Album B")
|
||||
_add_track(db, "seed", "al1", "ar1", "Seed")
|
||||
_add_track(db, "t2", "al2", "ar1", "Other Album Track")
|
||||
db._conn.commit()
|
||||
|
||||
res = db.get_radio_tracks("seed", limit=10)
|
||||
assert res["success"] is True
|
||||
ids = [t["id"] for t in res["tracks"]]
|
||||
assert "t2" in ids
|
||||
assert "seed" not in ids # seed always excluded
|
||||
|
||||
|
||||
def test_excludes_caller_supplied_ids(db):
|
||||
_add_artist(db, "ar1", "Artist One")
|
||||
_add_album(db, "al1", "ar1", "Album A")
|
||||
_add_album(db, "al2", "ar1", "Album B")
|
||||
_add_track(db, "seed", "al1", "ar1", "Seed")
|
||||
_add_track(db, "t2", "al2", "ar1", "T2")
|
||||
_add_track(db, "t3", "al2", "ar1", "T3")
|
||||
db._conn.commit()
|
||||
|
||||
res = db.get_radio_tracks("seed", limit=10, exclude_ids=["t2"])
|
||||
ids = [t["id"] for t in res["tracks"]]
|
||||
assert "t2" not in ids
|
||||
assert "t3" in ids
|
||||
|
||||
|
||||
def test_tier2_genre_match_other_artists(db):
|
||||
# No same-artist alternatives; falls to genre tier.
|
||||
_add_artist(db, "ar1", "Seed Artist", genres='["shoegaze"]')
|
||||
_add_artist(db, "ar2", "Other Artist", genres='["shoegaze"]')
|
||||
_add_album(db, "al1", "ar1", "Seed Album", genres='["shoegaze"]')
|
||||
_add_album(db, "al2", "ar2", "Other Album", genres='["shoegaze"]')
|
||||
_add_track(db, "seed", "al1", "ar1", "Seed")
|
||||
_add_track(db, "g1", "al2", "ar2", "Genre Match")
|
||||
db._conn.commit()
|
||||
|
||||
res = db.get_radio_tracks("seed", limit=10)
|
||||
ids = [t["id"] for t in res["tracks"]]
|
||||
assert "g1" in ids
|
||||
|
||||
|
||||
def test_tier4_random_fallback_fills_when_no_metadata_match(db):
|
||||
# Seed has no genre/mood/style and no same-artist alts → random tier.
|
||||
_add_artist(db, "ar1", "Seed Artist")
|
||||
_add_artist(db, "ar2", "Unrelated")
|
||||
_add_album(db, "al1", "ar1", "Seed Album")
|
||||
_add_album(db, "al2", "ar2", "Unrelated Album")
|
||||
_add_track(db, "seed", "al1", "ar1", "Seed")
|
||||
_add_track(db, "r1", "al2", "ar2", "Random One")
|
||||
db._conn.commit()
|
||||
|
||||
res = db.get_radio_tracks("seed", limit=10)
|
||||
ids = [t["id"] for t in res["tracks"]]
|
||||
assert "r1" in ids # filled from random tier
|
||||
|
||||
|
||||
def test_only_returns_tracks_with_files(db):
|
||||
_add_artist(db, "ar1", "Artist One")
|
||||
_add_album(db, "al1", "ar1", "Album A")
|
||||
_add_album(db, "al2", "ar1", "Album B")
|
||||
_add_track(db, "seed", "al1", "ar1", "Seed")
|
||||
_add_track(db, "nofile", "al2", "ar1", "No File", file_path="")
|
||||
db._conn.commit()
|
||||
|
||||
res = db.get_radio_tracks("seed", limit=10)
|
||||
ids = [t["id"] for t in res["tracks"]]
|
||||
assert "nofile" not in ids # file_path filter still enforced
|
||||
|
||||
|
||||
def test_no_duplicate_ids_across_tiers(db):
|
||||
# A track that qualifies for both same-artist AND genre must appear once.
|
||||
_add_artist(db, "ar1", "Artist One", genres='["pop"]')
|
||||
_add_album(db, "al1", "ar1", "Album A", genres='["pop"]')
|
||||
_add_album(db, "al2", "ar1", "Album B", genres='["pop"]')
|
||||
_add_track(db, "seed", "al1", "ar1", "Seed")
|
||||
_add_track(db, "dup", "al2", "ar1", "Could Match Twice")
|
||||
db._conn.commit()
|
||||
|
||||
res = db.get_radio_tracks("seed", limit=10)
|
||||
ids = [t["id"] for t in res["tracks"]]
|
||||
assert ids.count("dup") == 1
|
||||
@ -0,0 +1,139 @@
|
||||
"""Tests for the extracted radio-selection logic (Phase 0a of the player revamp).
|
||||
|
||||
These pin the behavior that used to be inline + untestable inside
|
||||
``database.music_database.get_radio_tracks``. They lock current behavior so
|
||||
Phase 2 (smart ranking) can evolve it against a green baseline.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from core.radio.selection import (
|
||||
RadioCollector,
|
||||
build_like_conditions,
|
||||
merge_tags,
|
||||
parse_tags,
|
||||
same_artist_cap,
|
||||
)
|
||||
|
||||
|
||||
class TestParseTags:
|
||||
def test_json_array(self):
|
||||
assert parse_tags('["rock", "indie"]') == ["rock", "indie"]
|
||||
|
||||
def test_comma_separated_legacy(self):
|
||||
assert parse_tags("rock, indie, folk") == ["rock", "indie", "folk"]
|
||||
|
||||
def test_comma_separated_strips_whitespace_and_blanks(self):
|
||||
assert parse_tags("rock, , indie ,") == ["rock", "indie"]
|
||||
|
||||
def test_empty_and_none(self):
|
||||
assert parse_tags("") == []
|
||||
assert parse_tags(None) == []
|
||||
|
||||
def test_non_list_json_scalar_wrapped(self):
|
||||
# A bare JSON scalar (e.g. a quoted string) becomes a single-item list.
|
||||
assert parse_tags('"rock"') == ["rock"]
|
||||
|
||||
def test_garbage_falls_back_to_split(self):
|
||||
assert parse_tags("not json at all") == ["not json at all"]
|
||||
|
||||
|
||||
class TestSameArtistCap:
|
||||
def test_thirty_percent(self):
|
||||
assert same_artist_cap(50) == 15 # 50 * 3 // 10
|
||||
assert same_artist_cap(20) == 6
|
||||
|
||||
def test_floored_at_five(self):
|
||||
assert same_artist_cap(10) == 5 # 3, floored to 5
|
||||
assert same_artist_cap(1) == 5
|
||||
|
||||
|
||||
class TestMergeTags:
|
||||
def test_dedupes_preserving_order(self):
|
||||
assert merge_tags(["rock", "indie"], ["indie", "folk"]) == ["rock", "indie", "folk"]
|
||||
|
||||
def test_empty_groups(self):
|
||||
assert merge_tags([], []) == []
|
||||
|
||||
|
||||
class TestBuildLikeConditions:
|
||||
def test_single_tag_two_columns(self):
|
||||
sql, params = build_like_conditions(["rock"], ("al.genres", "ar.genres"))
|
||||
assert sql == "al.genres LIKE ? OR ar.genres LIKE ?"
|
||||
assert params == ["%rock%", "%rock%"]
|
||||
|
||||
def test_grouping_matches_original_order(self):
|
||||
# Original emitted all album-col LIKEs, then all artist-col LIKEs;
|
||||
# params were [%t%...] * 2. Reproduce that ordering exactly.
|
||||
sql, params = build_like_conditions(["rock", "indie"], ("al.genres", "ar.genres"))
|
||||
assert sql == (
|
||||
"al.genres LIKE ? OR al.genres LIKE ? OR "
|
||||
"ar.genres LIKE ? OR ar.genres LIKE ?"
|
||||
)
|
||||
assert params == ["%rock%", "%indie%", "%rock%", "%indie%"]
|
||||
|
||||
def test_no_tags_returns_empty(self):
|
||||
assert build_like_conditions([], ("al.genres",)) == ("", [])
|
||||
|
||||
def test_no_columns_returns_empty(self):
|
||||
assert build_like_conditions(["rock"], ()) == ("", [])
|
||||
|
||||
|
||||
class TestRadioCollector:
|
||||
def _rows(self, *ids):
|
||||
return [{"id": i, "title": f"t{i}"} for i in ids]
|
||||
|
||||
def test_collects_and_dedupes(self):
|
||||
c = RadioCollector(limit=10)
|
||||
c.collect(self._rows(1, 2, 2, 3)) # dup 2 ignored
|
||||
assert [t["id"] for t in c.tracks] == [1, 2, 3]
|
||||
|
||||
def test_excludes_seed_and_caller_ids(self):
|
||||
c = RadioCollector(limit=10, exclude_ids=["1", "2"])
|
||||
c.collect(self._rows(1, 2, 3, 4))
|
||||
assert [t["id"] for t in c.tracks] == [3, 4]
|
||||
|
||||
def test_exclude_ids_coerced_to_str(self):
|
||||
# Caller may pass ints; seen-set stores strings.
|
||||
c = RadioCollector(limit=10, exclude_ids=[1])
|
||||
c.collect(self._rows(1, 2))
|
||||
assert [t["id"] for t in c.tracks] == [2]
|
||||
|
||||
def test_cap_bounds_a_single_tier(self):
|
||||
c = RadioCollector(limit=10)
|
||||
c.collect(self._rows(1, 2, 3, 4, 5), cap=2) # only 2 from this tier
|
||||
assert [t["id"] for t in c.tracks] == [1, 2]
|
||||
assert not c.filled
|
||||
assert c.remaining() == 8
|
||||
|
||||
def test_filled_at_limit(self):
|
||||
c = RadioCollector(limit=3)
|
||||
ret = c.collect(self._rows(1, 2, 3, 4))
|
||||
assert ret is True
|
||||
assert c.filled
|
||||
assert len(c.tracks) == 3
|
||||
assert c.remaining() == 0
|
||||
|
||||
def test_capped_collect_returns_true_at_cap_target(self):
|
||||
# Faithful to the original _collect: it returns True once the
|
||||
# cap-bounded target is hit, even below the overall limit. The DB
|
||||
# method IGNORES tier 1's capped return and checks .filled instead, so
|
||||
# this never causes early exit — but the contract must match exactly.
|
||||
c = RadioCollector(limit=5)
|
||||
assert c.collect(self._rows(1, 2), cap=2) is True # hit cap target (2)
|
||||
assert not c.filled # but not at limit (5)
|
||||
|
||||
def test_uncapped_collect_returns_true_only_at_limit(self):
|
||||
c = RadioCollector(limit=5)
|
||||
assert c.collect(self._rows(1, 2)) is False # below limit
|
||||
assert c.collect(self._rows(3, 4, 5)) is True # now at limit
|
||||
|
||||
def test_exclude_placeholders_and_values_track_seen_set(self):
|
||||
c = RadioCollector(limit=10, exclude_ids=["a", "b"])
|
||||
assert c.exclude_placeholders() == "?,?"
|
||||
assert set(c.exclude_values()) == {"a", "b"}
|
||||
# After collecting, already-collected IDs join the NOT-IN set so the
|
||||
# next tier's SQL won't re-pull them.
|
||||
c.collect(self._rows("c"))
|
||||
assert c.exclude_placeholders() == "?,?,?"
|
||||
assert set(c.exclude_values()) == {"a", "b", "c"}
|
||||
Loading…
Reference in new issue