You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
SoulSync/tests/test_video_api.py

1193 lines
59 KiB

"""Seam tests for the isolated /api/video blueprint (experimental branch).
Verifies the blueprint builds with its route, the dashboard endpoint returns
real (zeroed) JSON against an empty video.db, and that the video API package
imports nothing from the music side.
"""
from __future__ import annotations
from pathlib import Path
from flask import Flask
def _make_client(tmp_path):
# Inject a tmp-backed DB directly so the endpoint never falls back to the
# real default path (no stray database/video_library.db in the repo).
import api.video as videoapi
from database.video_database import VideoDatabase
videoapi._video_db = VideoDatabase(database_path=str(tmp_path / "video_library.db"))
app = Flask(__name__)
app.register_blueprint(videoapi.create_video_blueprint(), url_prefix="/api/video")
return app.test_client(), videoapi
def test_blueprint_exposes_dashboard_route():
from api.video import create_video_blueprint
app = Flask(__name__)
app.register_blueprint(create_video_blueprint(), url_prefix="/api/video")
rules = {r.rule for r in app.url_map.iter_rules()}
assert "/api/video/dashboard" in rules
assert "/api/video/scan/request" in rules
assert "/api/video/scan/status" in rules
assert "/api/video/scan/stop" in rules
assert "/api/video/scan/server" in rules
assert "/api/video/scan/server/status" in rules
assert "/api/video/library" in rules
assert "/api/video/libraries" in rules
assert "/api/video/server" in rules
assert any(r.startswith("/api/video/poster/") for r in rules)
assert "/api/video/enrichment/services" in rules
assert "/api/video/enrichment/<service>/status" in rules
assert "/api/video/enrichment/<service>/unmatched" in rules
assert "/api/video/enrichment/config" in rules
assert "/api/video/enrichment/<service>/test" in rules
assert "/api/video/detail/show/<int:show_id>" in rules
assert "/api/video/detail/movie/<int:movie_id>" in rules
assert "/api/video/detail/show/<int:show_id>/refresh-art" in rules
assert "/api/video/detail/movie/<int:movie_id>/refresh-art" in rules
assert "/api/video/detail/<kind>/<int:item_id>/extras" in rules
assert "/api/video/search" in rules
assert "/api/video/trending" in rules
assert "/api/video/tmdb/<kind>/<int:tmdb_id>" in rules
assert "/api/video/tmdb/show/<int:tv_id>/season/<int:season_number>" in rules
assert "/api/video/person/<int:tmdb_id>" in rules
assert "/api/video/episode/<int:tmdb_id>/<int:season>/<int:episode>" in rules
assert any(r.startswith("/api/video/backdrop/") for r in rules)
assert "/api/video/img" in rules
assert "/api/video/discover/hero" in rules
assert "/api/video/discover/genres" in rules
assert "/api/video/discover/taste" in rules
assert "/api/video/discover/list" in rules
assert "/api/video/discover/morelike" in rules
assert "/api/video/discover/trailer" in rules
def test_scan_request_threads_mode_and_media_type(tmp_path, monkeypatch):
# The Tools-page scan can target one library (movies / TV) or both. The
# endpoint must pass BOTH mode and media_type through to the scanner — movies
# and TV are independent, so a TV scan must never touch movies.
client, _ = _make_client(tmp_path)
calls = {}
class _FakeScanner:
def request_scan(self, source_factory, mode="full", media_type="all"):
calls["mode"] = mode
calls["media_type"] = media_type
return {"status": "started", "mode": mode, "media_type": media_type}
import core.video.scanner as scanner_mod
import core.video.sources as sources_mod
monkeypatch.setattr(scanner_mod, "get_video_scanner", lambda db: _FakeScanner())
monkeypatch.setattr(sources_mod, "get_active_video_source", lambda: None)
# default body → both libraries, full
assert client.post("/api/video/scan/request", json={}).get_json()["media_type"] == "all"
assert calls == {"mode": "full", "media_type": "all"}
# explicit movies-only deep scan
r = client.post("/api/video/scan/request", json={"mode": "deep", "media_type": "movie"})
assert calls == {"mode": "deep", "media_type": "movie"}
assert r.get_json() == {"status": "started", "mode": "deep", "media_type": "movie"}
# TV-only
client.post("/api/video/scan/request", json={"media_type": "show"})
assert calls["media_type"] == "show"
def test_server_scan_triggers_refresh_with_media_type(tmp_path, monkeypatch):
# The Server Scan tool tells the media server to rescan its OWN folders. The
# endpoint must thread media_type (Movies / TV / both) to the source refresh.
client, _ = _make_client(tmp_path)
calls = {}
import core.video.sources as sources_mod
def _refresh(media_type="all"):
calls["mt"] = media_type
return {"ok": True, "sections": [media_type]}
monkeypatch.setattr(sources_mod, "refresh_video_server_sections", _refresh)
r = client.post("/api/video/scan/server", json={"media_type": "movie"})
assert r.get_json() == {"ok": True, "sections": ["movie"]}
assert calls["mt"] == "movie"
calls.clear()
client.post("/api/video/scan/server", json={}) # default → both libraries
assert calls["mt"] == "all"
def test_server_scan_status_reports_scanning_flag(tmp_path, monkeypatch):
client, _ = _make_client(tmp_path)
import core.video.sources as sources_mod
seen = {}
def _inprog(media_type="all"):
seen["mt"] = media_type
return True
monkeypatch.setattr(sources_mod, "video_server_scan_in_progress", _inprog)
assert client.get("/api/video/scan/server/status?media_type=show").get_json() == {"scanning": True}
assert seen["mt"] == "show"
# None (adapter can't report) passes straight through as JSON null
monkeypatch.setattr(sources_mod, "video_server_scan_in_progress", lambda mt="all": None)
assert client.get("/api/video/scan/server/status").get_json() == {"scanning": None}
def test_discover_trailer_returns_key(tmp_path, monkeypatch):
client, _ = _make_client(tmp_path)
import core.video.enrichment.engine as eng_mod
class FakeEng:
def trailer(self, kind, tmdb_id): return {"key": "abc123", "name": "Official"}
monkeypatch.setattr(eng_mod, "get_video_enrichment_engine", lambda: FakeEng())
assert client.get("/api/video/discover/trailer?kind=movie&tmdb_id=5").get_json()["trailer"]["key"] == "abc123"
# a non-numeric id is rejected without touching the engine
assert client.get("/api/video/discover/trailer?kind=movie").get_json() == {"trailer": None}
def test_discover_morelike_builds_seeded_rails(tmp_path, monkeypatch):
client, vapi = _make_client(tmp_path)
db = vapi._video_db
db.upsert_movie("plex", {"server_id": "m1", "title": "Dune", "tmdb_id": 1, "file": {"relative_path": "a.mkv"}})
import core.video.enrichment.engine as eng_mod
class FakeEng:
def recommendations(self, kind, tmdb_id, page=1):
return [{"kind": "movie", "tmdb_id": 100 + i} for i in range(6)]
monkeypatch.setattr(eng_mod, "get_video_enrichment_engine", lambda: FakeEng())
rails = client.get("/api/video/discover/morelike").get_json()["rails"]
assert rails and rails[0]["title"] == "More like Dune"
assert len(rails[0]["items"]) == 6
def test_discover_list_pages_concatenates_and_dedupes(tmp_path, monkeypatch):
client, _ = _make_client(tmp_path)
import core.video.enrichment.engine as eng_mod
class FakeEng:
def discover_curated(self, key, page=1):
# page 1 → ids 1,2 ; page 2 → ids 2,3 (overlap on 2)
return ([{"kind": "movie", "tmdb_id": 1}, {"kind": "movie", "tmdb_id": 2}] if page == 1
else [{"kind": "movie", "tmdb_id": 2}, {"kind": "movie", "tmdb_id": 3}])
monkeypatch.setattr(eng_mod, "get_video_enrichment_engine", lambda: FakeEng())
r = client.get("/api/video/discover/list?key=popular_movies&pages=2")
assert [it["tmdb_id"] for it in r.get_json()["items"]] == [1, 2, 3] # concatenated + deduped
def test_discover_list_trending_fetches_once_despite_pages(tmp_path, monkeypatch):
client, _ = _make_client(tmp_path)
import core.video.enrichment.engine as eng_mod
calls = {"n": 0}
class FakeEng:
def trending(self):
calls["n"] += 1
return [{"kind": "movie", "tmdb_id": 9}]
monkeypatch.setattr(eng_mod, "get_video_enrichment_engine", lambda: FakeEng())
r = client.get("/api/video/discover/list?key=trending&pages=3")
assert calls["n"] == 1 # fixed list — not refetched per page
assert [it["tmdb_id"] for it in r.get_json()["items"]] == [9]
def test_img_proxy_rejects_non_tmdb(tmp_path):
client, _ = _make_client(tmp_path)
assert client.get("/api/video/img?u=https://evil.example.com/x.jpg").status_code == 404
assert client.get("/api/video/img").status_code == 404
def test_search_endpoint_empty_query(tmp_path):
client, _ = _make_client(tmp_path)
resp = client.get("/api/video/search?q=")
assert resp.status_code == 200
assert resp.get_json() == {"results": [], "query": ""}
def test_search_endpoint_uses_engine(tmp_path, monkeypatch):
client, _ = _make_client(tmp_path)
class FakeEngine:
def search(self, q): return [{"kind": "movie", "tmdb_id": 1, "title": "Dune", "library_id": None}]
monkeypatch.setattr("core.video.enrichment.engine.get_video_enrichment_engine",
lambda: FakeEngine())
body = client.get("/api/video/search?q=dune").get_json()
assert body["query"] == "dune" and body["results"][0]["title"] == "Dune"
def test_tmdb_detail_endpoint(tmp_path, monkeypatch):
client, _ = _make_client(tmp_path)
class FakeEngine:
def tmdb_detail(self, kind, tid): return {"source": "tmdb", "kind": kind, "id": tid, "title": "X"}
monkeypatch.setattr("core.video.enrichment.engine.get_video_enrichment_engine",
lambda: FakeEngine())
resp = client.get("/api/video/tmdb/movie/438631")
assert resp.status_code == 200 and resp.get_json()["source"] == "tmdb"
assert client.get("/api/video/tmdb/bogus/1").status_code == 400
def test_omdb_key_change_retries_unrated(tmp_path, monkeypatch):
client, videoapi = _make_client(tmp_path)
db = videoapi._video_db
mid = db.upsert_movie("plex", {"server_id": "m1", "title": "A", "imdb_id": "tt1"})
db.apply_ratings("movie", mid, {}) # burned: synced, but no rating
assert db.ratings_next() is None # not pending
monkeypatch.setattr("core.video.enrichment.engine.rebuild_video_enrichment_engine", lambda: None)
resp = client.post("/api/video/enrichment/config", json={"omdb_api_key": "NEWKEY"})
assert resp.status_code == 200
assert db.ratings_next() is not None # new key → re-queued for rating
def test_show_detail_endpoint(tmp_path):
client, videoapi = _make_client(tmp_path)
try:
sid = videoapi._video_db.upsert_show_tree("plex", {
"server_id": "s1", "title": "Show", "seasons": [
{"season_number": 1, "episodes": [
{"episode_number": 1, "title": "Pilot",
"file": {"relative_path": "e1.mkv", "size_bytes": 5}}]}]})
resp = client.get("/api/video/detail/show/%d" % sid)
assert resp.status_code == 200
d = resp.get_json()
assert d["kind"] == "show" and d["episode_total"] == 1 and d["episode_owned"] == 1
assert d["seasons"][0]["episodes"][0]["title"] == "Pilot"
assert client.get("/api/video/detail/show/999999").status_code == 404
finally:
videoapi._video_db = None
def test_enrichment_priority_endpoint(tmp_path):
client, videoapi = _make_client(tmp_path)
try:
assert client.get("/api/video/enrichment/priority").get_json()["priority"] == ""
r = client.post("/api/video/enrichment/priority", json={"priority": "show"})
assert r.status_code == 200 and r.get_json()["priority"] == "show"
assert client.get("/api/video/enrichment/priority").get_json()["priority"] == "show"
assert client.post("/api/video/enrichment/priority", json={"priority": "bogus"}).status_code == 400
finally:
videoapi._video_db = None
def test_monitor_toggle_endpoint(tmp_path):
client, videoapi = _make_client(tmp_path)
try:
sid = videoapi._video_db.upsert_show_tree("plex", {"server_id": "s1", "title": "S"})
r = client.post("/api/video/monitor", json={"kind": "show", "id": sid, "monitored": False})
assert r.status_code == 200 and r.get_json()["monitored"] is False
assert videoapi._video_db.show_detail(sid)["monitored"] is False
r2 = client.post("/api/video/monitor", json={"kind": "show", "id": sid, "monitored": True})
assert r2.status_code == 200 and videoapi._video_db.show_detail(sid)["monitored"] is True
# bad inputs
assert client.post("/api/video/monitor", json={"kind": "bogus", "id": sid}).status_code == 400
assert client.post("/api/video/monitor", json={"kind": "show", "id": 999999, "monitored": True}).status_code == 404
finally:
videoapi._video_db = None
def test_movie_detail_endpoint(tmp_path):
client, videoapi = _make_client(tmp_path)
try:
mid = videoapi._video_db.upsert_movie("plex", {"server_id": "m1", "title": "Dune"})
resp = client.get("/api/video/detail/movie/%d" % mid)
assert resp.status_code == 200
assert resp.get_json()["title"] == "Dune"
assert client.get("/api/video/detail/movie/999999").status_code == 404
finally:
videoapi._video_db = None
def test_dashboard_endpoint_returns_zeroed_json(tmp_path):
client, videoapi = _make_client(tmp_path)
try:
resp = client.get("/api/video/dashboard")
assert resp.status_code == 200
data = resp.get_json()
assert data["library"]["movies"] == 0
assert data["downloads"]["active"] == 0
assert data["watchlist"] == 0 and data["wishlist"] == 0
finally:
videoapi._video_db = None # don't leak the tmp DB to other tests
def test_library_endpoint_lists_content(tmp_path):
client, videoapi = _make_client(tmp_path)
try:
videoapi._video_db.upsert_movie("plex", {"server_id": "m1", "title": "A",
"poster_url": "/library/metadata/1/thumb/9"})
resp = client.get("/api/video/library?kind=movies")
assert resp.status_code == 200
data = resp.get_json()
assert [m["title"] for m in data["items"]] == ["A"]
assert data["items"][0]["has_poster"] is True # flag, not the raw path
assert "poster_url" not in data["items"][0] # don't leak server paths
assert data["pagination"]["total_count"] == 1
finally:
videoapi._video_db = None
def test_libraries_endpoint_lists_and_saves(tmp_path, monkeypatch):
client, videoapi = _make_client(tmp_path)
try:
import core.video.sources as vs
monkeypatch.setattr(vs, "list_video_libraries", lambda: {
"server": "plex", "movies": [{"title": "Movies"}], "tv": [{"title": "TV"}]})
import config.settings as cs
monkeypatch.setattr(cs.config_manager, "get_active_media_server", lambda: "plex")
data = client.get("/api/video/libraries").get_json()
assert data["server"] == "plex"
assert [m["title"] for m in data["movies"]] == ["Movies"]
assert data["selected"]["movies"] is None
assert client.post("/api/video/libraries", json={"movies": "Movies", "tv": "TV"}).status_code == 200
data2 = client.get("/api/video/libraries").get_json()
assert data2["selected"] == {"movies": "Movies", "tv": "TV"}
finally:
videoapi._video_db = None
def test_enrichment_endpoints(tmp_path):
import api.video as videoapi
from database.video_database import VideoDatabase
import core.video.enrichment.engine as eng_mod
from core.video.enrichment.engine import VideoEnrichmentEngine
class FakeClient:
enabled = True
def match(self, *a, **k): return None
def test(self): return (True, "ok")
db = VideoDatabase(database_path=str(tmp_path / "video_library.db"))
videoapi._video_db = db
eng_mod._engine = VideoEnrichmentEngine(db, {"tmdb": FakeClient(), "tvdb": FakeClient()})
app = Flask(__name__)
app.register_blueprint(videoapi.create_video_blueprint(), url_prefix="/api/video")
client = app.test_client()
try:
svc = client.get("/api/video/enrichment/services").get_json()
ids = {s["id"] for s in svc["services"]}
assert {"tmdb", "tvdb"} <= ids # matcher workers
assert {"ryd", "sponsorblock", "fanart", "opensubtitles"} <= ids # backfill workers
mid = db.upsert_movie("plex", {"server_id": "m1", "title": "X"})
st = client.get("/api/video/enrichment/tmdb/status").get_json()
assert st["enabled"] is True and st["stats"]["pending"] == 1
db.enrichment_apply("tmdb", "movie", mid, matched=False)
bd = client.get("/api/video/enrichment/tmdb/breakdown").get_json()
assert bd["breakdown"]["movie"]["not_found"] == 1
un = client.get("/api/video/enrichment/tmdb/unmatched?kind=movie&status=not_found").get_json()
assert un["total"] == 1 and un["kind"] == "movie"
assert client.post("/api/video/enrichment/tmdb/pause").get_json()["status"] == "paused"
assert client.post("/api/video/enrichment/tmdb/resume").get_json()["status"] == "running"
assert client.post("/api/video/enrichment/tmdb/retry",
json={"kind": "movie", "scope": "failed"}).get_json()["reset"] == 1
assert client.post("/api/video/enrichment/tmdb/test").get_json()["success"] is True
assert client.post("/api/video/enrichment/nope/test").status_code == 404
assert client.get("/api/video/enrichment/nope/status").status_code == 404
finally:
videoapi._video_db = None
eng_mod._engine = None
def test_enrichment_config_save_load(tmp_path, monkeypatch):
import api.video as videoapi
from database.video_database import VideoDatabase
import core.video.enrichment.engine as eng_mod
# Don't build a real engine (would open the default-path DB + start threads).
monkeypatch.setattr(eng_mod, "rebuild_video_enrichment_engine", lambda: None)
db = VideoDatabase(database_path=str(tmp_path / "video_library.db"))
videoapi._video_db = db
app = Flask(__name__)
app.register_blueprint(videoapi.create_video_blueprint(), url_prefix="/api/video")
client = app.test_client()
try:
assert client.get("/api/video/enrichment/config").get_json() == {
"tmdb_api_key": "", "tvdb_api_key": "", "omdb_api_key": "",
"fanart_api_key": "", "opensubtitles_api_key": "", "trakt_api_key": "",
"ryd_enabled": True, "sponsorblock_enabled": True, "dearrow_enabled": True,
"tvmaze_enabled": True, "anilist_enabled": False, "wikidata_enabled": True,
"billboard_autoplay": True, "watch_region": "US"}
client.post("/api/video/enrichment/config",
json={"tmdb_api_key": "abc", "tvdb_api_key": "xyz", "omdb_api_key": "om",
"fanart_api_key": "fa", "opensubtitles_api_key": "os",
"ryd_enabled": False, "sponsorblock_enabled": True,
"billboard_autoplay": False, "watch_region": "gb"})
assert client.get("/api/video/enrichment/config").get_json() == {
"tmdb_api_key": "abc", "tvdb_api_key": "xyz", "omdb_api_key": "om",
"fanart_api_key": "fa", "opensubtitles_api_key": "os", "trakt_api_key": "",
"ryd_enabled": False, "sponsorblock_enabled": True, "dearrow_enabled": True,
"tvmaze_enabled": True, "anilist_enabled": False, "wikidata_enabled": True,
"billboard_autoplay": False, "watch_region": "GB"}
assert db.get_setting("tmdb_api_key") == "abc" and db.get_setting("omdb_api_key") == "om"
assert client.get("/api/video/prefs").get_json() == {
"billboard_autoplay": False, "watch_region": "GB"}
finally:
videoapi._video_db = None
def test_downloads_config_save_load(tmp_path, monkeypatch):
import api.video as videoapi
import config.settings as cfg
from database.video_database import VideoDatabase
class _Cfg: # fake shared app config so the test never touches real music config
def __init__(self):
self._d = {}
def get(self, key, default=None):
return self._d.get(key, default)
def set(self, key, value):
self._d[key] = value
fake = _Cfg()
monkeypatch.setattr(cfg, "config_manager", fake, raising=False)
db = VideoDatabase(database_path=str(tmp_path / "video_library.db"))
videoapi._video_db = db
app = Flask(__name__)
app.register_blueprint(videoapi.create_video_blueprint(), url_prefix="/api/video")
client = app.test_client()
try:
# Defaults: empty folders, soulseek mode. Shared input + per-type libraries.
assert client.get("/api/video/downloads/config").get_json() == {
"download_path": "", "movies_path": "", "tv_path": "", "youtube_path": "",
"download_mode": "soulseek", "hybrid_order": ["soulseek"]}
# Round-trips: libraries → video.db, the INPUT folder → the SHARED music key.
client.post("/api/video/downloads/config",
json={"download_path": " /mnt/v/dl ", "movies_path": "/media/movies",
"tv_path": "/media/tv", "youtube_path": "/media/yt",
"download_mode": "hybrid", "hybrid_order": ["torrent", "usenet"]})
assert client.get("/api/video/downloads/config").get_json() == {
"download_path": "/mnt/v/dl", "movies_path": "/media/movies", # trimmed
"tv_path": "/media/tv", "youtube_path": "/media/yt",
"download_mode": "hybrid", "hybrid_order": ["torrent", "usenet"]}
# The input folder is the SHARED soulseek.download_path (so music sees it too);
# it is NOT stored in video.db.
assert fake.get("soulseek.download_path") == "/mnt/v/dl"
assert db.get_setting("download_path") is None
# Library paths DO persist to video.db.
assert db.get_setting("movies_path") == "/media/movies"
# Legacy single transfer_path migrates into Movies when movies_path is unset.
db.set_setting("movies_path", "")
db.set_setting("transfer_path", "/old/lib")
assert client.get("/api/video/downloads/config").get_json()["movies_path"] == "/old/lib"
finally:
videoapi._video_db = None
def test_quality_profile_endpoint_roundtrips(tmp_path):
import api.video as videoapi
from database.video_database import VideoDatabase
db = VideoDatabase(database_path=str(tmp_path / "video_library.db"))
videoapi._video_db = db
app = Flask(__name__)
app.register_blueprint(videoapi.create_video_blueprint(), url_prefix="/api/video")
client = app.test_client()
try:
# Default profile served when unset (rich-curated model: tier ladder + cutoff).
d = client.get("/api/video/downloads/quality").get_json()
assert [t["key"] for t in d["tiers"]][0] == "remux-2160p"
assert d["cutoff_resolution"] == "1080p" and d["prefer_codec"] == "hevc"
# POST normalizes + persists; bad codec rejected, loose 4K cutoff kept.
out = client.post("/api/video/downloads/quality",
json={"prefer_codec": "bogus", "max_movie_gb": 50,
"cutoff_resolution": "2160p", "prefer_hdr": "require"}).get_json()
assert out["prefer_codec"] == "hevc" and out["max_movie_gb"] == 50
assert out["cutoff_resolution"] == "2160p" and out["prefer_hdr"] == "require"
assert client.get("/api/video/downloads/quality").get_json()["max_movie_gb"] == 50
finally:
videoapi._video_db = None
def test_youtube_quality_profile_endpoint_roundtrips(tmp_path):
import api.video as videoapi
from database.video_database import VideoDatabase
db = VideoDatabase(database_path=str(tmp_path / "video_library.db"))
videoapi._video_db = db
app = Flask(__name__)
app.register_blueprint(videoapi.create_video_blueprint(), url_prefix="/api/video")
client = app.test_client()
try:
# Separate, smaller yt-dlp-shaped profile (no ladder/cutoff/rejects).
d = client.get("/api/video/downloads/youtube-quality").get_json()
assert d["max_resolution"] == "1080p" and d["container"] == "mp4"
# POST normalizes + persists; bad container rejected, valid resolution kept.
out = client.post("/api/video/downloads/youtube-quality",
json={"max_resolution": "2160p", "container": "avi",
"video_codec": "av1"}).get_json()
assert out["max_resolution"] == "2160p" and out["container"] == "mp4"
assert out["video_codec"] == "av1"
assert client.get("/api/video/downloads/youtube-quality").get_json()["max_resolution"] == "2160p"
finally:
videoapi._video_db = None
def test_quality_evaluate_endpoint_judges_owned_copy(tmp_path):
import api.video as videoapi
from database.video_database import VideoDatabase
db = VideoDatabase(database_path=str(tmp_path / "video_library.db"))
videoapi._video_db = db
app = Flask(__name__)
app.register_blueprint(videoapi.create_video_blueprint(), url_prefix="/api/video")
client = app.test_client()
try:
# Default profile cuts off at 1080p → a 720p copy is below target.
below = client.post("/api/video/downloads/evaluate",
json={"file": {"resolution": "720p", "video_codec": "x265"}}).get_json()
assert below["meets"] is False and below["resolution_label"] == "720p"
# A 1080p copy meets it.
ok = client.post("/api/video/downloads/evaluate",
json={"file": {"resolution": "1920x1080", "video_codec": "x265"}}).get_json()
assert ok["meets"] is True
finally:
videoapi._video_db = None
def test_downloads_search_endpoint_ranks_and_filters(tmp_path):
import api.video as videoapi
from database.video_database import VideoDatabase
db = VideoDatabase(database_path=str(tmp_path / "video_library.db"))
videoapi._video_db = db
app = Flask(__name__)
app.register_blueprint(videoapi.create_video_blueprint(), url_prefix="/api/video")
client = app.test_client()
try:
d = client.post("/api/video/downloads/search",
json={"scope": "movie", "title": "The Matrix", "year": 1999}).get_json()
assert d["scope"] == "movie" and d["results"]
# accepted hits sort ahead of rejected ones; the cam hit is rejected.
accepted = [r for r in d["results"] if r["accepted"]]
assert accepted and d["results"][0]["accepted"] is True
assert any(r["rejected"] and "reject" in r["rejected"] for r in d["results"]) # the HDCAM
# a season search returns season packs (validated against the profile/scope).
s = client.post("/api/video/downloads/search",
json={"scope": "season", "title": "The Wire", "season": 2}).get_json()
assert s["results"] and all(".S02" in r["title"] for r in s["results"])
finally:
videoapi._video_db = None
def test_downloads_grab_and_active(tmp_path, monkeypatch):
import api.video as videoapi
import core.video.download_monitor as mon
import core.video.slskd_download as slskd
from database.video_database import VideoDatabase
monkeypatch.setattr(slskd, "start_download", lambda *a, **k: {"ok": True})
monkeypatch.setattr(mon, "ensure_started", lambda *a, **k: None) # don't spawn the thread
db = VideoDatabase(database_path=str(tmp_path / "video_library.db"))
db.set_setting("movies_path", "/media/movies")
videoapi._video_db = db
app = Flask(__name__)
app.register_blueprint(videoapi.create_video_blueprint(), url_prefix="/api/video")
client = app.test_client()
try:
r = client.post("/api/video/downloads/grab", json={
"kind": "movie", "title": "The Matrix", "source": "soulseek",
"username": "neo", "filename": r"@@a\x\m.mkv", "size_bytes": 8, "quality_label": "1080p"})
out = r.get_json()
assert out["ok"] is True and out["id"] > 0
# tracked + routed to the Movies library.
act = client.get("/api/video/downloads/active").get_json()["downloads"]
assert len(act) == 1 and act[0]["status"] == "downloading"
assert act[0]["target_dir"] == "/media/movies" and act[0]["kind"] == "movie"
# missing source info → 400.
assert client.post("/api/video/downloads/grab",
json={"kind": "movie", "source": "soulseek"}).status_code == 400
# non-soulseek not wired yet → 400.
assert client.post("/api/video/downloads/grab",
json={"kind": "movie", "source": "torrent", "username": "u",
"filename": "f"}).status_code == 400
# clear removes only finished (none yet).
assert client.post("/api/video/downloads/clear").get_json()["cleared"] == 0
finally:
videoapi._video_db = None
def test_downloads_status_lookup_by_id_and_media(tmp_path, monkeypatch):
"""The live-tracking endpoint: the modal result card looks up by download id,
a movie detail page looks up by media_id+media_source. Powers both progress UIs."""
import api.video as videoapi
import core.video.download_monitor as mon
import core.video.slskd_download as slskd
from database.video_database import VideoDatabase
monkeypatch.setattr(slskd, "start_download", lambda *a, **k: {"ok": True})
monkeypatch.setattr(mon, "ensure_started", lambda *a, **k: None)
db = VideoDatabase(database_path=str(tmp_path / "video_library.db"))
db.set_setting("movies_path", "/media/movies")
videoapi._video_db = db
app = Flask(__name__)
app.register_blueprint(videoapi.create_video_blueprint(), url_prefix="/api/video")
client = app.test_client()
try:
gid = client.post("/api/video/downloads/grab", json={
"kind": "movie", "title": "The Matrix", "source": "soulseek",
"username": "neo", "filename": "m.mkv", "size_bytes": 8, "quality_label": "1080p",
"media_id": 42, "media_source": "library"}).get_json()["id"]
# by id
byid = client.get("/api/video/downloads/status?id=" + str(gid)).get_json()["download"]
assert byid and byid["id"] == gid and byid["status"] == "downloading"
# by media identity (what the movie detail page uses)
bym = client.get("/api/video/downloads/status?media_id=42&media_source=library").get_json()["download"]
assert bym and bym["id"] == gid
# a different movie / unknown id → null, never an error
assert client.get("/api/video/downloads/status?media_id=999&media_source=library").get_json()["download"] is None
assert client.get("/api/video/downloads/status?id=123456").get_json()["download"] is None
assert client.get("/api/video/downloads/status").get_json()["download"] is None
finally:
videoapi._video_db = None
def test_slskd_config_shared_via_config_manager(tmp_path, monkeypatch):
import api.video as videoapi
import config.settings as cfg
from database.video_database import VideoDatabase
class _Cfg:
def __init__(self):
self._d = {}
def get(self, key, default=None):
return self._d.get(key, default)
def set(self, key, value):
self._d[key] = value
fake = _Cfg()
monkeypatch.setattr(cfg, "config_manager", fake, raising=False)
db = VideoDatabase(database_path=str(tmp_path / "video_library.db"))
videoapi._video_db = db
app = Flask(__name__)
app.register_blueprint(videoapi.create_video_blueprint(), url_prefix="/api/video")
client = app.test_client()
try:
d = client.get("/api/video/downloads/slskd").get_json()
assert d["slskd_url"] == "http://localhost:5030" and d["search_timeout"] == 60
# Writes the SHARED soulseek.* keys (so the music side sees the same slskd).
client.post("/api/video/downloads/slskd",
json={"slskd_url": "http://nas:5030", "search_timeout": 90,
"auto_clear_searches": False})
assert fake.get("soulseek.slskd_url") == "http://nas:5030"
assert fake.get("soulseek.search_timeout") == 90
assert fake.get("soulseek.auto_clear_searches") is False
# NOT the video-specific paths (those live in video.db, never config_manager).
assert fake.get("soulseek.download_path") is None
assert client.get("/api/video/downloads/slskd").get_json()["slskd_url"] == "http://nas:5030"
finally:
videoapi._video_db = None
def test_video_api_imports_nothing_from_music():
base = Path(__file__).resolve().parent.parent / "api" / "video"
for py in base.glob("*.py"):
for line in py.read_text(encoding="utf-8").splitlines():
s = line.strip()
if s.startswith("import ") or s.startswith("from "):
assert "music" not in s.lower(), f"{py.name}: music import leaked: {s!r}"
# ── Watchlist endpoints (shows + people) ────────────────────────────────
def test_watchlist_routes_registered():
from api.video import create_video_blueprint
app = Flask(__name__)
app.register_blueprint(create_video_blueprint(), url_prefix="/api/video")
rules = {r.rule for r in app.url_map.iter_rules()}
for r in ("/api/video/watchlist", "/api/video/watchlist/add",
"/api/video/watchlist/remove", "/api/video/watchlist/check",
"/api/video/watchlist/counts"):
assert r in rules, r
def test_watchlist_add_check_list_remove_roundtrip(tmp_path):
client, _ = _make_client(tmp_path)
# empty to start
assert client.get("/api/video/watchlist").get_json() == {
"success": True, "shows": [], "people": [],
"counts": {"show": 0, "person": 0, "total": 0}}
# add a show + a person
r = client.post("/api/video/watchlist/add", json={
"kind": "show", "tmdb_id": 1399, "title": "Game of Thrones",
"poster_url": "/p.jpg", "library_id": 7})
assert r.get_json() == {"success": True, "watched": True}
client.post("/api/video/watchlist/add", json={
"kind": "person", "tmdb_id": 287, "title": "Brad Pitt"})
# list groups by kind
data = client.get("/api/video/watchlist").get_json()
assert data["counts"] == {"show": 1, "person": 1, "total": 2}
assert data["shows"][0]["title"] == "Game of Thrones"
assert data["people"][0]["tmdb_id"] == 287
# check (hydration) — only watched ids come back, keys are strings
chk = client.post("/api/video/watchlist/check",
json={"kind": "show", "tmdb_ids": [1399, 9999]}).get_json()
assert chk == {"success": True, "results": {"1399": True}}
# counts endpoint
assert client.get("/api/video/watchlist/counts").get_json() == {
"success": True, "show": 1, "person": 1, "total": 2}
# remove
rem = client.post("/api/video/watchlist/remove",
json={"kind": "show", "tmdb_id": 1399}).get_json()
assert rem["success"] is True and rem["watched"] is False and rem["removed"] is True
assert client.get("/api/video/watchlist/counts").get_json()["show"] == 0
def test_watchlist_add_validates_input(tmp_path):
client, _ = _make_client(tmp_path)
assert client.post("/api/video/watchlist/add", json={"kind": "movie", "tmdb_id": 1, "title": "x"}).status_code == 400
assert client.post("/api/video/watchlist/add", json={"kind": "show", "title": "no id"}).status_code == 400
assert client.post("/api/video/watchlist/add", json={"kind": "show", "tmdb_id": 1}).status_code == 400 # no title
assert client.post("/api/video/watchlist/remove", json={"kind": "person"}).status_code == 400
assert client.post("/api/video/watchlist/check", json={"tmdb_ids": [1]}).status_code == 400 # no kind
def test_watchlist_endpoint_paginates_and_searches(tmp_path):
client, _ = _make_client(tmp_path)
for i in range(1, 6):
client.post("/api/video/watchlist/add", json={"kind": "person", "tmdb_id": 300 + i, "title": "P%d" % i})
d = client.get("/api/video/watchlist?kind=person&page=1&limit=2").get_json()
assert d["success"] and len(d["items"]) == 2
assert d["pagination"]["total_count"] == 5 and d["pagination"]["total_pages"] == 3
assert d["counts"]["person"] == 5
s = client.get("/api/video/watchlist?kind=person&search=P3").get_json()
assert len(s["items"]) == 1 and s["items"][0]["title"] == "P3"
# ── wishlist endpoints ────────────────────────────────────────────────────────
def test_wishlist_add_movie_then_list(tmp_path):
client, _ = _make_client(tmp_path)
r = client.post("/api/video/wishlist/add", json={"movie": {"tmdb_id": 603, "title": "The Matrix", "year": 1999}})
assert r.get_json() == {"success": True, "added": 1, "counts": {"movie": 1, "show": 0, "episode": 0, "total": 1}}
lst = client.get("/api/video/wishlist?kind=movie").get_json()
assert lst["success"] and lst["items"][0]["tmdb_id"] == 603 and lst["counts"]["movie"] == 1
def test_wishlist_add_episodes_groups_into_show(tmp_path):
client, _ = _make_client(tmp_path)
r = client.post("/api/video/wishlist/add", json={
"show": {"tmdb_id": 1396, "title": "Breaking Bad", "poster_url": "/bb.jpg"},
"episodes": [{"season_number": 1, "episode_number": 1, "title": "Pilot"},
{"season_number": 1, "episode_number": 2}]})
assert r.get_json()["added"] == 2
show = client.get("/api/video/wishlist?kind=show").get_json()["items"][0]
assert show["tmdb_id"] == 1396 and show["wanted"] == 2
assert show["seasons"][0]["season_number"] == 1
def test_wishlist_add_requires_valid_body(tmp_path):
client, _ = _make_client(tmp_path)
assert client.post("/api/video/wishlist/add", json={}).status_code == 400
# show with no episodes is rejected (episodes are the atomic unit)
assert client.post("/api/video/wishlist/add", json={"show": {"tmdb_id": 1, "title": "S"}}).status_code == 400
def test_wishlist_remove_scopes_via_api(tmp_path):
client, _ = _make_client(tmp_path)
client.post("/api/video/wishlist/add", json={
"show": {"tmdb_id": 1396, "title": "Breaking Bad"},
"episodes": [{"season_number": 1, "episode_number": 1}, {"season_number": 1, "episode_number": 2}]})
r = client.post("/api/video/wishlist/remove",
json={"scope": "episode", "tmdb_id": 1396, "season_number": 1, "episode_number": 2})
assert r.get_json()["removed"] == 1 and r.get_json()["counts"]["episode"] == 1
assert client.post("/api/video/wishlist/remove", json={"scope": "show", "tmdb_id": 1396}).get_json()["removed"] == 1
assert client.post("/api/video/wishlist/remove", json={"scope": "bogus", "tmdb_id": 1}).status_code == 400
def test_wishlist_check_hydration(tmp_path):
client, _ = _make_client(tmp_path)
client.post("/api/video/wishlist/add", json={"movie": {"tmdb_id": 603, "title": "The Matrix"}})
client.post("/api/video/wishlist/add", json={
"show": {"tmdb_id": 1396, "title": "Breaking Bad"},
"episodes": [{"season_number": 2, "episode_number": 3}]})
res = client.post("/api/video/wishlist/check", json={"movie_ids": [603, 700], "show_tmdb_id": 1396}).get_json()
assert res["movies"] == [603] and res["episodes"] == ["2_3"]
def test_wishlist_routes_registered():
from flask import Flask
from api.video import create_video_blueprint
app = Flask(__name__)
app.register_blueprint(create_video_blueprint(), url_prefix="/api/video")
rules = {r.rule for r in app.url_map.iter_rules()}
for r in ("/api/video/wishlist", "/api/video/wishlist/add", "/api/video/wishlist/remove",
"/api/video/wishlist/check", "/api/video/wishlist/counts"):
assert r in rules
def test_wishlist_check_by_show(tmp_path):
client, _ = _make_client(tmp_path)
client.post("/api/video/wishlist/add", json={
"show": {"tmdb_id": 1396, "title": "BB"}, "episodes": [{"season_number": 1, "episode_number": 1}]})
res = client.post("/api/video/wishlist/check", json={"shows": [1396, 1399]}).get_json()
assert res["by_show"]["1396"] == ["1_1"] and "1399" not in res["by_show"]
def test_wishlist_backfill_art_endpoint(tmp_path, monkeypatch):
client, vapi = _make_client(tmp_path)
db = vapi._video_db
db.add_episodes_to_wishlist(1396, "BB", [{"season_number": 1, "episode_number": 1}]) # no art
import core.video.enrichment.engine as eng_mod
class FakeEng:
def tmdb_season(self, tv, sn):
return {"poster_url": "/s1.jpg", "episodes": [{"episode_number": 1, "still_url": "/s.jpg"}]}
monkeypatch.setattr(eng_mod, "get_video_enrichment_engine", lambda: FakeEng())
assert client.post("/api/video/wishlist/backfill-art").get_json()["success"] is True
season = db.query_wishlist("show")["items"][0]["seasons"][0]
assert season["poster_url"] == "/s1.jpg" and season["episodes"][0]["still_url"] == "/s.jpg"
# ── YouTube channels ─────────────────────────────────────────────────────────
def test_youtube_routes_registered():
from api.video import create_video_blueprint
app = Flask(__name__)
app.register_blueprint(create_video_blueprint(), url_prefix="/api/video")
rules = {r.rule for r in app.url_map.iter_rules()}
for r in ("/api/video/youtube/resolve", "/api/video/youtube/follow",
"/api/video/youtube/unfollow", "/api/video/youtube/channels",
"/api/video/youtube/wishlist", "/api/video/youtube/wishlist/remove"):
assert r in rules
_CHANNEL = {
"youtube_id": "UCPlay", "title": "PlayStation", "avatar_url": "http://a/p.jpg",
"handle": "@PlayStation", "videos": [
{"youtube_id": "v1", "title": "State of Play", "published_at": "2024-06-01",
"thumbnail_url": "http://t/1.jpg", "description": "the latest"},
{"youtube_id": "v2", "title": "Trailer", "published_at": "2024-01-01"},
]}
def test_youtube_resolve_previews_without_committing(tmp_path, monkeypatch):
client, videoapi = _make_client(tmp_path)
import core.video.youtube as ytmod
monkeypatch.setattr(ytmod, "resolve_channel", lambda url, limit=24: dict(_CHANNEL))
r = client.get("/api/video/youtube/resolve?url=https://youtube.com/@PlayStation")
data = r.get_json()
assert data["success"] is True
assert data["channel"]["youtube_id"] == "UCPlay"
assert data["following"] is False # not committed yet
# nothing was written
assert videoapi._video_db.youtube_wishlist_counts() == {"channel": 0, "video": 0}
def test_youtube_resolve_rejects_non_channel(tmp_path, monkeypatch):
client, _ = _make_client(tmp_path)
import core.video.youtube as ytmod
monkeypatch.setattr(ytmod, "resolve_channel", lambda url, limit=24: None)
r = client.get("/api/video/youtube/resolve?url=https://youtube.com/watch?v=x")
assert r.status_code == 404 and r.get_json()["success"] is False
def test_youtube_follow_then_channels_and_wishlist(tmp_path):
client, _ = _make_client(tmp_path)
# pre-resolved channel in the body → no network/yt-dlp needed
r = client.post("/api/video/youtube/follow", json={"channel": _CHANNEL})
data = r.get_json()
assert data["success"] is True and data["following"] is True
assert data["added_videos"] == 2
assert data["counts"] == {"channel": 1, "video": 2}
# appears on the watchlist channels list: wished_count = the 2 wished videos,
# video_count = the remembered catalog (0 here — nothing cached in this test).
chans = client.get("/api/video/youtube/channels").get_json()
assert chans["channels"][0]["youtube_id"] == "UCPlay"
assert chans["channels"][0]["wished_count"] == 2
assert chans["channels"][0]["video_count"] == 0
# appears in the youtube wishlist as a nebula channel (year=season, video=episode)
wl = client.get("/api/video/youtube/wishlist").get_json()
grp = wl["items"][0]
assert grp["youtube_id"] == "UCPlay" and grp["wanted"] == 2 and grp["source"] == "youtube"
vids = [e["source_id"] for se in grp["seasons"] for e in se["episodes"]]
assert set(vids) == {"v1", "v2"}
# resolve now reports following=True (hydration) — stub resolve to avoid network
import core.video.youtube as ytmod
ytmod_resolve = ytmod.resolve_channel
try:
ytmod.resolve_channel = lambda url, limit=24: dict(_CHANNEL)
rr = client.get("/api/video/youtube/resolve?url=@PlayStation").get_json()
assert rr["following"] is True
finally:
ytmod.resolve_channel = ytmod_resolve
def test_youtube_unfollow_and_remove_scopes(tmp_path):
client, db_api = _make_client(tmp_path)
client.post("/api/video/youtube/follow", json={"channel": _CHANNEL})
# unfollow removes the watchlist row but keeps wished videos
r = client.post("/api/video/youtube/unfollow", json={"youtube_id": "UCPlay"})
assert r.get_json() == {"success": True, "following": False}
assert client.get("/api/video/youtube/channels").get_json()["channels"] == []
assert db_api._video_db.youtube_wishlist_counts() == {"channel": 1, "video": 2}
# remove a single video
r = client.post("/api/video/youtube/wishlist/remove", json={"scope": "video", "source_id": "v1"})
assert r.get_json()["removed"] == 1
assert r.get_json()["counts"] == {"channel": 1, "video": 1}
# remove the whole channel's videos
r = client.post("/api/video/youtube/wishlist/remove", json={"scope": "channel", "source_id": "UCPlay"})
assert r.get_json()["counts"] == {"channel": 0, "video": 0}
def test_youtube_follow_requires_url_or_channel(tmp_path):
client, _ = _make_client(tmp_path)
r = client.post("/api/video/youtube/follow", json={})
assert r.status_code == 400
def test_youtube_channel_detail_hydrates_follow_and_wished(tmp_path, monkeypatch):
client, videoapi = _make_client(tmp_path)
import core.video.youtube as ytmod
monkeypatch.setattr(ytmod, "resolve_channel", lambda url, limit=60: dict(_CHANNEL))
monkeypatch.setattr(ytmod, "channel_recent_dates", lambda cid: {}) # no network in tests
# follow first so detail reports following + one wished video
client.post("/api/video/youtube/follow", json={"channel": _CHANNEL})
videoapi._video_db.remove_one_video_from_wishlist("v2") # leave only v1 wished
d = client.get("/api/video/youtube/channel/UCPlay").get_json()
assert d["success"] is True and d["kind"] == "channel" and d["following"] is True
wished = {v["youtube_id"]: v["wished"] for v in d["channel"]["videos"]}
assert wished == {"v1": True, "v2": False}
# the videos' dates got cached for year-seasons
assert videoapi._video_db.get_video_dates(["v1"]) == {"v1": "2024-06-01"}
def test_youtube_channel_detail_is_cache_first(tmp_path, monkeypatch):
client, videoapi = _make_client(tmp_path)
import core.video.youtube as ytmod
db = videoapi._video_db
# Pre-remember the channel (as the enricher / a prior open would have).
db.cache_channel_meta("UCmem", {"title": "Remembered", "avatar_url": "av", "subscriber_count": 9})
db.cache_channel_videos("UCmem", [{"youtube_id": "m1", "title": "M1", "thumbnail_url": "t1"}])
db.cache_video_dates([{"youtube_id": "m1", "published_at": "2021-07-07"}])
# A cache HIT must NOT touch the network (yt-dlp / RSS).
monkeypatch.setattr(ytmod, "resolve_channel",
lambda *a, **k: (_ for _ in ()).throw(AssertionError("resolved on a cache hit")))
monkeypatch.setattr(ytmod, "channel_recent_dates",
lambda *a, **k: (_ for _ in ()).throw(AssertionError("RSS on a cache hit")))
d = client.get("/api/video/youtube/channel/UCmem").get_json()
assert d["success"] is True and d["from_cache"] is True
assert d["channel"]["title"] == "Remembered" and d["channel"]["subscriber_count"] == 9
vids = d["channel"]["videos"]
assert len(vids) == 1 and vids[0]["youtube_id"] == "m1" and vids[0]["published_at"] == "2021-07-07"
def test_youtube_channel_detail_404_on_unresolvable(tmp_path, monkeypatch):
client, _ = _make_client(tmp_path)
import core.video.youtube as ytmod
monkeypatch.setattr(ytmod, "resolve_channel", lambda url, limit=60: None)
assert client.get("/api/video/youtube/channel/UCnope").status_code == 404
def test_youtube_channel_videos_batch_streams_and_merges(tmp_path, monkeypatch):
client, videoapi = _make_client(tmp_path)
import core.video.youtube as ytmod
# One POST = one InnerTube page; the token rides in the body (not the URL).
calls = []
def fake_page(cid, continuation=None, now=None, post=None):
calls.append(continuation)
return {"videos": [{"youtube_id": "a1", "title": "A1", "thumbnail_url": "ta", "published_at": "2020-01-01"},
{"youtube_id": "a2", "title": "A2", "thumbnail_url": "tb", "published_at": "2019-01-01"}],
"continuation": "NEXT" if continuation is None else None}
monkeypatch.setattr(ytmod, "innertube_channel_videos_page", fake_page)
# Seed a cached (exact) date for a1 + wish a2 so the merge is observable.
videoapi._video_db.cache_video_dates([{"youtube_id": "a1", "published_at": "2020-03-15"}])
videoapi._video_db.add_videos_to_wishlist({"youtube_id": "UCx", "title": "X"},
[{"youtube_id": "a2", "title": "A2"}])
r = client.post("/api/video/youtube/channel/UCx/videos", json={}).get_json()
assert r["success"] is True
assert r["continuation"] == "NEXT" # token passed back for the next batch
vids = {v["youtube_id"]: v for v in r["videos"]}
assert vids["a1"]["published_at"] == "2020-03-15" # cached date wins over InnerTube's
assert vids["a1"]["wished"] is False and vids["a2"]["wished"] is True
assert calls[0] is None # first batch started from page 1
# the streamed list is remembered for instant re-open
assert {v["youtube_id"] for v in videoapi._video_db.get_channel_videos("UCx")} == {"a1", "a2"}
# the next page is fetched by POSTing the returned token (kept out of the URL)
r2 = client.post("/api/video/youtube/channel/UCx/videos", json={"continuation": "NEXT"}).get_json()
assert r2["continuation"] is None and calls[1] == "NEXT"
def test_youtube_playlist_follow_detail_and_watchlist(tmp_path, monkeypatch):
client, videoapi = _make_client(tmp_path)
import core.video.youtube as ytmod
_PL = {"playlist_id": "PLx", "title": "Mix", "channel_title": "Lex", "video_count": 2,
"thumbnail_url": "t", "videos": [{"youtube_id": "a", "title": "A"}, {"youtube_id": "b", "title": "B"}]}
monkeypatch.setattr(ytmod, "parse_playlist_id", lambda u: "PLx" if "list=" in (u or "") else None)
monkeypatch.setattr(ytmod, "resolve_playlist", lambda *a, **k: dict(_PL))
monkeypatch.setattr(ytmod, "innertube_playlist_catalog", lambda *a, **k: {"videos": [], "total": None})
# resolve detects a playlist link → returns a playlist (not a channel)
r = client.get("/api/video/youtube/resolve?url=https://youtube.com/playlist?list=PLx").get_json()
assert r["success"] and r["playlist"]["playlist_id"] == "PLx" and r["following"] is False
# follow → appears under the watchlist's playlists
assert client.post("/api/video/youtube/playlist/follow", json={"playlist": _PL}).get_json()["following"] is True
assert [p["playlist_id"] for p in client.get("/api/video/youtube/channels").get_json()["playlists"]] == ["PLx"]
# detail (kept in curator order; flagged following)
d = client.get("/api/video/youtube/playlist/PLx").get_json()
assert d["kind"] == "playlist" and d["following"] is True
assert [v["youtube_id"] for v in d["playlist"]["videos"]] == ["a", "b"]
# unfollow
assert client.post("/api/video/youtube/playlist/unfollow", json={"playlist_id": "PLx"}).get_json()["following"] is False
assert client.get("/api/video/youtube/channels").get_json()["playlists"] == []
def test_youtube_wishlist_add_single_video(tmp_path):
client, _ = _make_client(tmp_path)
r = client.post("/api/video/youtube/wishlist/add", json={
"channel": {"youtube_id": "UCPlay", "title": "PlayStation"},
"videos": [{"youtube_id": "solo1", "title": "One Video", "published_at": "2024-03-03"}]})
assert r.get_json()["added"] == 1
assert r.get_json()["counts"] == {"channel": 1, "video": 1}
r = client.post("/api/video/youtube/wishlist/remove", json={"scope": "video", "source_id": "solo1"})
assert r.get_json()["counts"] == {"channel": 0, "video": 0}
def test_youtube_video_detail_endpoint_persists_description(tmp_path, monkeypatch):
client, videoapi = _make_client(tmp_path)
import core.video.youtube as ytmod
full = {"youtube_id": "v1", "title": "State of Play", "description": "Full synopsis here.",
"duration_seconds": 600, "view_count": 5000, "like_count": 100,
"published_at": "2024-06-01", "webpage_url": "https://youtu.be/v1", "tags": []}
monkeypatch.setattr(ytmod, "video_detail", lambda vid: dict(full))
# wish the video first (empty overview), then the detail call should backfill it
videoapi._video_db.add_videos_to_wishlist({"youtube_id": "UCx", "title": "X"},
[{"youtube_id": "v1", "title": "SoP"}])
d = client.get("/api/video/youtube/video/v1").get_json()
assert d["success"] is True and d["video"]["description"] == "Full synopsis here."
# persisted onto the wishlist row
grp = videoapi._video_db.query_youtube_wishlist()["items"][0]
ov = grp["seasons"][0]["episodes"][0]["overview"]
assert ov == "Full synopsis here."
def test_youtube_video_detail_404(tmp_path, monkeypatch):
client, _ = _make_client(tmp_path)
import core.video.youtube as ytmod
monkeypatch.setattr(ytmod, "video_detail", lambda vid: None)
assert client.get("/api/video/youtube/video/nope").status_code == 404
def test_img_proxy_allows_youtube_cdn_only(tmp_path, monkeypatch):
client, _ = _make_client(tmp_path)
import requests
class FakeResp:
status_code = 200
headers = {"Content-Type": "image/jpeg"}
def iter_content(self, n): yield b"x"
monkeypatch.setattr(requests, "get", lambda *a, **k: FakeResp())
assert client.get("/api/video/img?u=https://yt3.googleusercontent.com/abc=s900").status_code == 200
assert client.get("/api/video/img?u=https://i.ytimg.com/vi/x/hq.jpg").status_code == 200
assert client.get("/api/video/img?u=https://image.tmdb.org/t/p/w500/x.jpg").status_code == 200
# still SSRF-safe: arbitrary hosts rejected
assert client.get("/api/video/img?u=https://evil.example.com/x.jpg").status_code == 404
assert client.get("/api/video/img?u=http://yt3.googleusercontent.com/x").status_code == 404 # http
def test_youtube_playlists_and_playlist_videos(tmp_path, monkeypatch):
client, videoapi = _make_client(tmp_path)
import core.video.youtube as ytmod
monkeypatch.setattr(ytmod, "channel_playlists",
lambda cid: [{"playlist_id": "PL1", "title": "Trailers", "video_count": 3, "thumbnail_url": None}])
pls = client.get("/api/video/youtube/playlists/UCx").get_json()
assert pls["success"] is True and pls["playlists"][0]["playlist_id"] == "PL1"
monkeypatch.setattr(ytmod, "resolve_playlist",
lambda *a, **k: {"playlist_id": "PL1", "title": "Trailers",
"videos": [{"youtube_id": "a", "title": "A"},
{"youtube_id": "b", "title": "B"}]})
monkeypatch.setattr(ytmod, "innertube_playlist_catalog", lambda *a, **k: {"videos": [], "total": None})
# 'a' is wished → should hydrate wished=True
videoapi._video_db.add_videos_to_wishlist({"youtube_id": "UCx", "title": "X"}, [{"youtube_id": "a", "title": "A"}])
pv = client.get("/api/video/youtube/playlist/PL1").get_json()
wished = {v["youtube_id"]: v["wished"] for v in pv["videos"]} # still top-level `videos` for the expansion
assert wished == {"a": True, "b": False}
def test_youtube_search_endpoint_hydrates_following(tmp_path, monkeypatch):
client, videoapi = _make_client(tmp_path)
import core.video.youtube as ytmod
monkeypatch.setattr(ytmod, "search_channels", lambda q: [
{"youtube_id": "UCaaa", "title": "GMM"}, {"youtube_id": "UCbbb", "title": "Other"}])
videoapi._video_db.add_channel_to_watchlist({"youtube_id": "UCaaa", "title": "GMM"})
d = client.get("/api/video/youtube/search?q=gmm").get_json()
flags = {c["youtube_id"]: c["following"] for c in d["channels"]}
assert flags == {"UCaaa": True, "UCbbb": False}
assert client.get("/api/video/youtube/search?q=").get_json()["channels"] == []
def test_enrichment_youtube_status_route(tmp_path):
client, _ = _make_client(tmp_path)
d = client.get("/api/video/enrichment/youtube/status").get_json()
assert d["enabled"] is True and "running" in d and "progress" in d
assert client.post("/api/video/enrichment/youtube/pause").get_json()["status"] == "paused"
assert client.post("/api/video/enrichment/youtube/resume").get_json()["status"] == "running"
# unknown service still 404s
assert client.get("/api/video/enrichment/bogus/status").status_code == 404
def test_download_history_endpoints(tmp_path):
client, videoapi = _make_client(tmp_path)
db = videoapi._video_db
db.record_download_history({
"id": 1, "kind": "movie", "title": "Dune", "year": 2024, "status": "completed",
"release_title": "Dune.2024.2160p.x265", "dest_path": "/m/Dune.mkv",
"size_bytes": 9_000_000_000, "completed_at": "2026-06-20 10:30:00"})
# list + counts
r = client.get("/api/video/downloads/history")
assert r.status_code == 200
body = r.get_json()
assert body["success"] and body["counts"]["movie"] == 1
assert body["items"][0]["title"] == "Dune"
assert body["items"][0]["resolution"] == "2160p"
# kind filter
assert client.get("/api/video/downloads/history?kind=show").get_json()["pagination"]["total_count"] == 0
# detail
hid = body["items"][0]["id"]
d = client.get(f"/api/video/downloads/history/{hid}").get_json()
assert d["success"] and d["item"]["dest_path"] == "/m/Dune.mkv"
assert client.get("/api/video/downloads/history/99999").status_code == 404
def test_download_history_routes_registered():
from api.video import create_video_blueprint
app = Flask(__name__)
app.register_blueprint(create_video_blueprint(), url_prefix="/api/video")
rules = {r.rule for r in app.url_map.iter_rules()}
assert "/api/video/downloads/history" in rules
assert "/api/video/downloads/history/<int:history_id>" in rules