From bfbf94637b8e771b8562afcef7f4c5dc231abefb Mon Sep 17 00:00:00 2001 From: Nate Date: Fri, 19 Jun 2026 14:16:46 -0400 Subject: [PATCH] feat(#5): play YouTube/yt-dlp URLs and playlists, rebased on new pipeline Pass any yt-dlp-supported URL where a video file goes. A single video is downloaded (<=480p, ASCILINE only needs a tiny grid), normalized to H.264/AAC/CFR mp4, and cached in videos/ by id so replays and --loop are instant. A playlist/channel URL expands into one queue entry per video, each fetched lazily as it plays so a long playlist starts immediately. Also fixes the playlist.json eager-download bug Yusuf reported: URL entries in a JSON playlist are now left unresolved by load_playlist and fetched on demand by the playback loop, instead of synchronously downloading every link before the server starts. Ported onto the new thread-pool/zero-copy main; all integration points (resolve_video_path, load_playlist, build_queue, websocket loop) updated. --- README.md | 20 +++ stream_server.py | 56 +++++++- test_ytdl.py | 63 +++++++++ test_ytdl_hardening.py | 80 ++++++++++++ test_ytdl_normalize.py | 113 ++++++++++++++++ ytdl.py | 287 +++++++++++++++++++++++++++++++++++++++++ 6 files changed, 615 insertions(+), 4 deletions(-) create mode 100644 test_ytdl.py create mode 100644 test_ytdl_hardening.py create mode 100644 test_ytdl_normalize.py create mode 100644 ytdl.py diff --git a/README.md b/README.md index 4e7f75f..22e3a56 100644 --- a/README.md +++ b/README.md @@ -91,6 +91,13 @@ cd ASCILINE ```bash pip install fastapi uvicorn opencv-python numpy websockets ``` + +**Optional — play from YouTube (and other yt-dlp sites):** +```bash +pip install yt-dlp +``` +Only needed if you pass a URL instead of a local file. Local playback works +without it. URL playback also uses FFmpeg (see below) to normalize downloads. ### 🔈 Audio Support (FFmpeg Required) To enable server-side audio processing (Volume 1-5), you must have FFmpeg installed. @@ -111,6 +118,19 @@ If you get a `FileNotFoundError` or don't want to modify system variables: python stream_server.py video.mp4 --cols 240 ``` +**YouTube / URL (requires `yt-dlp`):** pass any yt-dlp-supported URL in place of a file. +```bash +python stream_server.py "https://youtu.be/VIDEO_ID" --cols 240 +python stream_server.py "https://www.youtube.com/playlist?list=..." --cols 220 --loop +``` +A single video is downloaded (≤480p — ASCILINE only needs a tiny grid) and cached +in `videos/` by id, so replays are instant. A playlist/channel URL expands into one +queue entry per video, each fetched on demand as it plays. A `playlist.json` may +also list URLs; those are fetched lazily too, so the server starts immediately +instead of downloading the whole list up front. Every download is normalized to a +standard H.264/AAC/constant-frame-rate mp4 so playback and audio stay reliable +regardless of the source codec. + **Folder mode — drop your videos into `videos/` and run:** ```bash python stream_server.py --folder videos --cols 200 diff --git a/stream_server.py b/stream_server.py index 7e56479..d042fb9 100644 --- a/stream_server.py +++ b/stream_server.py @@ -26,6 +26,7 @@ from websockets.exceptions import ConnectionClosed # Import the existing engine (ascii_video_player2.py) from ascii_video_player2 import VideoDecoder, AsciiMapper from codec import encode_frame +import ytdl app = FastAPI() @@ -73,11 +74,15 @@ def get_html_content(): def resolve_video_path(video: str) -> str: """ Resolves a video path by checking multiple locations in order: + 0. If it's a URL (YouTube, etc.) -> download via yt-dlp and use that file 1. As-is (absolute or relative to CWD) 2. Inside the project root (BASE_DIR) 3. Inside BASE_DIR/videos/ subfolder Returns the first path that exists, or the original string if none found. """ + if ytdl.is_url(video): + return ytdl.download(video, cache_dir=os.path.join(BASE_DIR, "videos")) + candidates = [ video, os.path.join(BASE_DIR, video), @@ -89,11 +94,19 @@ def resolve_video_path(video: str) -> str: return video # Return original; error will be caught during playback def load_playlist(playlist_path: str) -> list[dict]: - """Loads playlist from a JSON file and resolves all video paths.""" + """ + Loads a playlist from a JSON file and resolves local video paths. + + URL entries (YouTube, etc.) are left unresolved on purpose: resolving a + URL means downloading it, and eagerly downloading every link would block + startup until the whole playlist is on disk. Unresolved URLs are fetched + lazily by the playback loop the first time each one is about to play. + """ with open(playlist_path, "r", encoding="utf-8") as f: items = json.load(f) for item in items: - item["video"] = resolve_video_path(item["video"]) + if not ytdl.is_url(item["video"]): + item["video"] = resolve_video_path(item["video"]) return items def load_folder(folder_path: str, default_mode: int, default_vol: int) -> list[dict]: @@ -146,9 +159,23 @@ def build_queue(args) -> list[dict]: item["rows"] = args.rows return items - # Legacy: single video argument + # Single positional argument: a local file/path, or a URL. + # A URL may be a playlist/channel → expand it into one entry per video. default_cols = args.cols if args.cols is not None else (450 if args.pixel else 200) - return [{"video": resolve_video_path(args.video), "mode": args.mode, "vol": args.vol, "pixel": args.pixel, "cols": default_cols, "rows": args.rows}] + base = {"mode": args.mode, "vol": args.vol, "pixel": args.pixel, + "cols": default_cols, "rows": args.rows} + + if ytdl.is_url(args.video): + urls = ytdl.expand_playlist(args.video) + if len(urls) > 1: + print(f"[YT] playlist expanded → {len(urls)} videos " + f"(each downloaded on demand as it plays)") + # Keep URLs unresolved; the playback loop downloads each lazily so a + # long playlist doesn't block startup, and the cache makes replays + # (and --loop) instant. + return [{"video": u, **base} for u in urls] + + return [{"video": resolve_video_path(args.video), **base}] # ── APP STATE ────────────────────────────────────────────── @@ -386,6 +413,27 @@ async def websocket_endpoint(websocket: WebSocket): while True: entry = queue[queue_index] video_path = entry["video"] + + # Lazy resolve: an unresolved URL entry (a single URL, an expanded + # playlist item, or a URL from a playlist.json) is downloaded the + # first time it is about to play, then the local path is cached back + # into the queue so /audio and any --loop replay reuse the file + # instead of re-downloading. + if ytdl.is_url(video_path): + print(f"[YT] fetching ({queue_index + 1}/{len(queue)}) {video_path}") + try: + video_path = resolve_video_path(video_path) + entry["video"] = video_path + except Exception as e: + await websocket.send_text(f"Error: could not fetch '{video_path}': {e}") + queue_index += 1 + if queue_index >= len(queue): + if loop: + queue_index = 0 + else: + break + continue + render_mode= entry["mode"] pixel_mode = entry.get("pixel", False) cols = entry.get("cols", 200) diff --git a/test_ytdl.py b/test_ytdl.py new file mode 100644 index 0000000..6c19ac3 --- /dev/null +++ b/test_ytdl.py @@ -0,0 +1,63 @@ +"""Unit tests for ytdl.expand_playlist parsing (no network — _ytdlp is mocked).""" +import json +import subprocess +from unittest import mock + +import ytdl + + +def _cp(stdout="", returncode=0, stderr=""): + return subprocess.CompletedProcess(args=[], returncode=returncode, + stdout=stdout, stderr=stderr) + + +def test_expand_playlist_returns_each_entry_url(): + info = { + "_type": "playlist", + "entries": [ + {"id": "aaa", "url": "https://www.youtube.com/watch?v=aaa"}, + {"id": "bbb", "url": "https://www.youtube.com/watch?v=bbb"}, + ], + } + with mock.patch.object(ytdl, "_ytdlp", return_value=_cp(json.dumps(info))): + assert ytdl.expand_playlist("https://youtube.com/playlist?list=PL") == [ + "https://www.youtube.com/watch?v=aaa", + "https://www.youtube.com/watch?v=bbb", + ] + + +def test_expand_playlist_builds_url_from_bare_id(): + # Some yt-dlp versions emit a bare id in 'url' for flat entries. + info = {"_type": "playlist", "entries": [{"id": "xyz", "url": "xyz"}]} + with mock.patch.object(ytdl, "_ytdlp", return_value=_cp(json.dumps(info))): + assert ytdl.expand_playlist("https://youtube.com/playlist?list=PL") == [ + "https://www.youtube.com/watch?v=xyz", + ] + + +def test_expand_playlist_single_video_returns_input_unchanged(): + info = {"_type": "video", "id": "single"} + url = "https://youtu.be/single" + with mock.patch.object(ytdl, "_ytdlp", return_value=_cp(json.dumps(info))): + assert ytdl.expand_playlist(url) == [url] + + +def test_expand_playlist_falls_back_on_ytdlp_failure(): + url = "https://youtu.be/whatever" + with mock.patch.object(ytdl, "_ytdlp", return_value=_cp("", returncode=1, stderr="boom")): + assert ytdl.expand_playlist(url) == [url] + + +def test_expand_playlist_falls_back_on_bad_json(): + url = "https://youtu.be/whatever" + with mock.patch.object(ytdl, "_ytdlp", return_value=_cp("not json{")): + assert ytdl.expand_playlist(url) == [url] + + +def test_expand_playlist_skips_unusable_entries(): + info = {"_type": "playlist", "entries": [{"title": "no id or url"}, + {"id": "ok"}]} + with mock.patch.object(ytdl, "_ytdlp", return_value=_cp(json.dumps(info))): + assert ytdl.expand_playlist("https://youtube.com/playlist?list=PL") == [ + "https://www.youtube.com/watch?v=ok", + ] diff --git a/test_ytdl_hardening.py b/test_ytdl_hardening.py new file mode 100644 index 0000000..0ad9b41 --- /dev/null +++ b/test_ytdl_hardening.py @@ -0,0 +1,80 @@ +""" +Hardening tests for ytdl: missing dependency, livestreams, atomic caching, +and audio-only sources. All but one are offline (yt-dlp/ffprobe are mocked). +""" +import shutil +import subprocess +from unittest import mock + +import pytest + +import ytdl + + +def _cp(stdout="", returncode=0, stderr=""): + return subprocess.CompletedProcess(args=[], returncode=returncode, + stdout=stdout, stderr=stderr) + + +def test_missing_ytdlp_gives_actionable_error(tmp_path): + with mock.patch("importlib.util.find_spec", return_value=None): + with pytest.raises(RuntimeError, match="pip install yt-dlp"): + ytdl.download("https://youtu.be/x", cache_dir=str(tmp_path)) + + +def test_download_rejects_livestream(tmp_path): + # _probe_remote sees id on line 1, is_live=True on line 2. + with mock.patch("importlib.util.find_spec", return_value=object()), \ + mock.patch.object(ytdl, "_ytdlp", return_value=_cp("vid123\nTrue\n")): + with pytest.raises(RuntimeError, match="live stream"): + ytdl.download("https://youtu.be/live", cache_dir=str(tmp_path)) + + +def test_download_is_atomic_on_normalize_failure(tmp_path): + """A failed normalize must leave no cache file a later run would trust.""" + out = tmp_path / "vid123.mp4" + + def fake_ytdlp(*args, **kwargs): + if "is_live" in args: # _probe_remote + return _cp("vid123\nFalse\n") + if "-o" in args: # the download itself + target = args[args.index("-o") + 1] + with open(target, "wb") as f: # simulate a downloaded file + f.write(b"\x00\x00") + return _cp("ok") + return _cp("") + + with mock.patch("importlib.util.find_spec", return_value=object()), \ + mock.patch.object(ytdl, "_ytdlp", side_effect=fake_ytdlp), \ + mock.patch.object(ytdl, "normalize", side_effect=RuntimeError("boom")): + with pytest.raises(RuntimeError, match="boom"): + ytdl.download("https://youtu.be/x", cache_dir=str(tmp_path)) + + assert not out.exists() # no poisoned cache + assert not (tmp_path / "vid123.mp4.part.mp4").exists() # temp cleaned up + + +def test_cached_file_short_circuits_without_download(tmp_path): + out = tmp_path / "vid123.mp4" + out.write_bytes(b"already here") + + def fake_ytdlp(*args, **kwargs): + if "is_live" in args: + return _cp("vid123\nFalse\n") + raise AssertionError("must not download when cached") + + with mock.patch("importlib.util.find_spec", return_value=object()), \ + mock.patch.object(ytdl, "_ytdlp", side_effect=fake_ytdlp): + assert ytdl.download("https://youtu.be/x", cache_dir=str(tmp_path)) == str(out) + + +@pytest.mark.skipif(not shutil.which("ffmpeg"), reason="ffmpeg required") +def test_normalize_rejects_audio_only(tmp_path): + audio = tmp_path / "audio_only.mp4" + r = subprocess.run( + ["ffmpeg", "-y", "-f", "lavfi", "-i", "sine=frequency=440:duration=1", + "-c:a", "aac", "-loglevel", "error", str(audio)], + capture_output=True, text=True) + assert r.returncode == 0, r.stderr + with pytest.raises(RuntimeError, match="no video stream"): + ytdl.normalize(str(audio)) diff --git a/test_ytdl_normalize.py b/test_ytdl_normalize.py new file mode 100644 index 0000000..de7de06 --- /dev/null +++ b/test_ytdl_normalize.py @@ -0,0 +1,113 @@ +""" +Deterministic tests for ytdl.normalize() — no network required. + +We synthesize the exact kinds of files YouTube serves that broke the engine +(VP9 video + Opus audio inside an mp4 container, and variable-frame-rate video) +and assert that normalize() turns them into canonical H.264 / AAC / CFR mp4s the +engine can open and time correctly. +""" +import shutil +import subprocess + +import cv2 +import pytest + +import ytdl + + +def _run(*args): + return subprocess.run(args, capture_output=True, text=True) + + +def _has_encoders(*names): + """True only if ffmpeg exists and lists every requested encoder.""" + if not shutil.which("ffmpeg"): + return False + out = _run("ffmpeg", "-hide_banner", "-encoders").stdout + return all(name in out for name in names) + + +# Building the *broken* inputs needs these encoders; the fix itself only needs +# libx264/aac. Skip cleanly on a minimal ffmpeg instead of failing CI. +requires_vp9_opus = pytest.mark.skipif( + not _has_encoders("libvpx-vp9", "libopus"), + reason="ffmpeg without libvpx-vp9/libopus; cannot synthesize the broken input") +requires_x264 = pytest.mark.skipif( + not _has_encoders("libx264"), reason="ffmpeg without libx264") + + +def _make_vp9_opus_mp4(path): + """A VP9+Opus stream copied into an mp4 — non-standard, exactly what + `--merge-output-format mp4` produces from YouTube's 'best' streams.""" + src = str(path) + ".src.mkv" + _run("ffmpeg", "-y", + "-f", "lavfi", "-i", "testsrc=size=320x240:rate=24:duration=1", + "-f", "lavfi", "-i", "sine=frequency=440:duration=1", + "-c:v", "libvpx-vp9", "-b:v", "200k", "-c:a", "libopus", + "-loglevel", "error", src) + # copy (not re-encode) into mp4 -> opus-in-mp4, the broken container + r = _run("ffmpeg", "-y", "-i", src, "-c", "copy", "-loglevel", "error", str(path)) + assert r.returncode == 0, r.stderr + + +def _make_vfr_mp4(path): + """An H.264 mp4 whose nominal rate disagrees with its average rate (VFR).""" + r = _run("ffmpeg", "-y", + "-f", "lavfi", "-i", "testsrc=size=320x240:rate=60:duration=1", + "-vf", "select='not(mod(n,3))'", # drop 2 of every 3 frames -> VFR + "-fps_mode", "vfr", "-c:v", "libx264", "-an", + "-loglevel", "error", str(path)) + assert r.returncode == 0, r.stderr + + +def _audio_decodes(path): + r = _run("ffmpeg", "-v", "error", "-i", str(path), "-t", "0.5", "-f", "null", "-") + return r.returncode == 0 and "Invalid data" not in r.stderr + + +@requires_vp9_opus +def test_vp9_opus_mp4_is_repaired(tmp_path): + bad = tmp_path / "bad.mp4" + _make_vp9_opus_mp4(bad) + + info = ytdl._probe(str(bad)) + assert info["vcodec"] == "vp9" # confirms we built the broken input + assert info["acodec"] == "opus" + + assert ytdl.normalize(str(bad)) is True # it had to be repaired + + fixed = ytdl._probe(str(bad)) + assert fixed["vcodec"] == "h264" + assert fixed["acodec"] == "aac" + assert fixed["cfr"] is True + assert _audio_decodes(bad) # /audio extraction now works + + cap = cv2.VideoCapture(str(bad)) # OpenCV can open + read it + ok, _ = cap.read() + fps = cap.get(cv2.CAP_PROP_FPS) + cap.release() + assert ok + assert abs(fps - 24) < 0.5 # engine sees the real, stable FPS + + +@requires_x264 +def test_vfr_is_made_constant(tmp_path): + bad = tmp_path / "vfr.mp4" + _make_vfr_mp4(bad) + assert ytdl._probe(str(bad))["cfr"] is False + + assert ytdl.normalize(str(bad)) is True + assert ytdl._probe(str(bad))["cfr"] is True + + +@requires_x264 +def test_clean_file_is_left_alone(tmp_path): + good = tmp_path / "good.mp4" + r = _run("ffmpeg", "-y", + "-f", "lavfi", "-i", "testsrc=size=320x240:rate=24:duration=1", + "-f", "lavfi", "-i", "sine=frequency=440:duration=1", + "-c:v", "libx264", "-pix_fmt", "yuv420p", "-r", "24", + "-c:a", "aac", "-loglevel", "error", str(good)) + assert r.returncode == 0, r.stderr + + assert ytdl.normalize(str(good)) is False # fast path: no re-encode diff --git a/ytdl.py b/ytdl.py new file mode 100644 index 0000000..8af18d3 --- /dev/null +++ b/ytdl.py @@ -0,0 +1,287 @@ +""" +ytdl.py — Resolve YouTube (and other yt-dlp-supported) URLs to a local file +that the ASCILINE engine can ALWAYS open. + +ASCILINE downscales every frame to a tiny character grid, so there is no point +pulling high resolution. We cap at <=480p and produce a single mp4 with audio +(the /audio endpoint runs ffmpeg on the same file). Downloads are cached in +videos/ by video id so re-runs are instant. + +Robustness contract (why this file is more than a one-liner): + The engine reads frames with OpenCV (cv2.VideoCapture) and extracts audio with + ffmpeg. Both break on the files YouTube actually serves: + * Best-quality audio is often Opus/WebM. Muxed into mp4 it is a NON-STANDARD + file that OpenCV/ffmpeg can choke on -> /audio fails or playback crashes. + * Best-quality video is often VP9/AV1, which OpenCV cannot decode without + hardware support. + * YouTube content is frequently variable-frame-rate (VFR). The engine's whole + timing model assumes a single constant FPS (frame_t = 1/fps), so VFR makes + cv2's CAP_PROP_FPS unreliable and playback drifts / "the FPS count breaks". + So after download we PROBE the file with ffprobe and, unless it is already + H.264 + AAC + constant-frame-rate, we normalize it to exactly that. The engine + is left untouched: it only ever sees clean, canonical mp4s. +""" +import os +import sys +import json +import shutil +import importlib.util +import subprocess + +_URL_HINTS = ("http://", "https://", "youtube.com", "youtu.be") + +# What the engine can open without surprises. +_OK_VCODEC = "h264" +_OK_ACODEC = "aac" + +# Subprocess guards so a stuck source can never hang the server. +_DL_TIMEOUT = 900 # yt-dlp download of a <=480p clip +_PROBE_TIMEOUT = 60 # ffprobe / metadata reads +_ENCODE_TIMEOUT = 1800 # ffmpeg re-encode of a long video + + +def is_url(s: str) -> bool: + s = s.lower() + return s.startswith(("http://", "https://")) or "youtube.com" in s or "youtu.be" in s + + +def _entry_url(entry: dict) -> str | None: + """Best-effort downloadable URL for a single flat-playlist entry.""" + url = entry.get("url") or entry.get("webpage_url") + if url and "/" in url: # already a full URL + return url + vid = entry.get("id") or url # bare video id (common for YouTube) + if vid: + return f"https://www.youtube.com/watch?v={vid}" + return None + + +def expand_playlist(url: str) -> list[str]: + """ + Expand a playlist/channel URL into a list of individual video URLs. + + Uses `--flat-playlist` so it only reads the index (no per-video download). + Returns ``[url]`` unchanged for a single video, or if expansion fails for + any reason, so the caller can still attempt a normal single download. + """ + _require_ytdlp() + res = _ytdlp("--flat-playlist", "-J", url, timeout=_PROBE_TIMEOUT) + if res.returncode != 0 or not res.stdout.strip(): + return [url] + try: + info = json.loads(res.stdout) + except json.JSONDecodeError: + return [url] + if info.get("_type") != "playlist": + return [url] + urls = [u for u in (_entry_url(e) for e in info.get("entries") or []) if u] + return urls or [url] + + +def _require_ytdlp() -> None: + """Fail early with an actionable message instead of a cryptic import error.""" + if importlib.util.find_spec("yt_dlp") is None: + raise RuntimeError("yt-dlp is not installed. Install it with: pip install yt-dlp") + + +def _ytdlp(*args: str, timeout: int = _DL_TIMEOUT) -> subprocess.CompletedProcess: + # Use the running interpreter's yt_dlp so it always matches the venv. + try: + return subprocess.run([sys.executable, "-m", "yt_dlp", *args], + capture_output=True, text=True, timeout=timeout) + except subprocess.TimeoutExpired: + raise RuntimeError(f"yt-dlp timed out after {timeout}s") + + +def _probe_remote(url: str) -> tuple[str, bool]: + """Return (video_id, is_live) for a single video URL, without downloading.""" + res = _ytdlp("--no-playlist", "--print", "id", "--print", "is_live", url, + timeout=_PROBE_TIMEOUT) + if res.returncode != 0 or not res.stdout.strip(): + raise RuntimeError(f"yt-dlp could not read {url!r}: {res.stderr.strip()[:200]}") + lines = res.stdout.strip().splitlines() + video_id = lines[0].strip() + is_live = len(lines) > 1 and lines[1].strip().lower() == "true" + return video_id, is_live + + +def download(url: str, cache_dir: str = "videos") -> str: + """Download `url` (<=480p) into cache_dir as a canonical mp4 and return the path.""" + _require_ytdlp() + os.makedirs(cache_dir, exist_ok=True) + + video_id, is_live = _probe_remote(url) + if is_live: + raise RuntimeError( + f"{url!r} is a live stream; ASCILINE plays finite videos only") + + out = os.path.join(cache_dir, f"{video_id}.mp4") + # A file at `out` is only ever created by the atomic rename below, so its + # mere existence guarantees a complete, already-normalized video. + if os.path.exists(out): + print(f"[YT] cached: {out}") + return out + + print(f"[YT] downloading {url} (<=480p) ...") + # Bias the selection toward a clean container in the first place: H.264 video + # (avc1) that OpenCV decodes everywhere, paired with AAC audio (mp4a) that is + # standard inside mp4. Each fallback widens what we accept; normalize() below + # repairs whatever the chosen format could not give us cleanly. + fmt = ("bv*[vcodec^=avc1][height<=480]+ba[acodec^=mp4a]/" # avc1 + aac -> clean + "bv*[vcodec^=avc1][height<=480]+ba/" # avc1 + any audio + "b[vcodec^=avc1][height<=480]/" # progressive avc1 + "bv*[height<=480]+ba/b[height<=480]/b") # last resort + + # Download + normalize into a temp file, then atomically rename. An + # interruption (crash, Ctrl-C, full disk) leaves only the temp file, never a + # half-written `out` that a later run would mistake for a good cache hit. + tmp = out + ".part.mp4" + _unlink(tmp) + try: + res = _ytdlp("--no-playlist", "-f", fmt, + "--merge-output-format", "mp4", "-o", tmp, url) + if res.returncode != 0 or not os.path.exists(tmp): + raise RuntimeError(f"yt-dlp download failed: {res.stderr.strip()[-300:]}") + normalize(tmp) + os.replace(tmp, out) # atomic finalize + except BaseException: + _unlink(tmp) + raise + + print(f"[YT] saved: {out}") + return out + + +def _unlink(path: str) -> None: + """Best-effort remove; never raises (used in cleanup paths).""" + try: + os.remove(path) + except OSError: + pass + + +# ── format normalization ──────────────────────────────────────────────────── + +def _probe(path: str) -> dict: + """ + Return {'vcodec','acodec','fps','cfr'} via ffprobe. + + fps is the *average* real frame rate (avg_frame_rate); cfr is True only when + the container's nominal rate (r_frame_rate) matches the average, i.e. the + file is genuinely constant-frame-rate and safe for the engine's timing model. + """ + info = {"vcodec": None, "acodec": None, "fps": None, "cfr": False} + if not shutil.which("ffprobe"): + return info + try: + res = subprocess.run( + ["ffprobe", "-v", "error", "-show_entries", + "stream=codec_type,codec_name,r_frame_rate,avg_frame_rate", + "-of", "json", path], + capture_output=True, text=True, timeout=_PROBE_TIMEOUT) + except subprocess.TimeoutExpired: + return info + if res.returncode != 0: + return info + try: + streams = json.loads(res.stdout).get("streams", []) + except json.JSONDecodeError: + return info + for st in streams: + if st.get("codec_type") == "video" and info["vcodec"] is None: + info["vcodec"] = st.get("codec_name") + r_fps = _ratio(st.get("r_frame_rate")) + a_fps = _ratio(st.get("avg_frame_rate")) + info["fps"] = a_fps or r_fps + # CFR when both rates are known and agree within rounding. + info["cfr"] = bool(r_fps and a_fps and abs(r_fps - a_fps) < 0.01) + elif st.get("codec_type") == "audio" and info["acodec"] is None: + info["acodec"] = st.get("codec_name") + return info + + +def _ratio(s: str | None) -> float | None: + """Parse ffprobe rationals like '30000/1001' -> 29.97; '0/0' -> None.""" + if not s or "/" not in s: + return None + num, den = s.split("/", 1) + try: + num_f, den_f = float(num), float(den) + except ValueError: + return None + return num_f / den_f if den_f else None + + +def normalize(path: str) -> bool: + """ + Ensure `path` is a canonical mp4 the engine can always open: + H.264 video + AAC audio (or no audio) at a constant frame rate. + + Fast path: if the file already satisfies the contract, do nothing and return + False. Otherwise transcode in place and return True. Re-encoding is the only + reliable way to repair VP9/AV1 video, Opus-in-mp4 audio, and VFR timing — a + plain remux cannot. + """ + info = _probe(path) + if info["vcodec"] is None: + # No decodable video stream — nothing ASCILINE can render. + what = "audio-only source" if info["acodec"] else "unreadable file" + raise RuntimeError(f"{path!r}: no video stream ({what})") + has_audio = info["acodec"] is not None + clean = (info["vcodec"] == _OK_VCODEC + and (not has_audio or info["acodec"] == _OK_ACODEC) + and info["cfr"]) + if clean and _decodable(path): + return False + + reason = [] + if info["vcodec"] != _OK_VCODEC: + reason.append(f"video={info['vcodec']}") + if has_audio and info["acodec"] != _OK_ACODEC: + reason.append(f"audio={info['acodec']}") + if not info["cfr"]: + reason.append("vfr") + print(f"[YT] normalizing ({', '.join(reason) or 'unreadable'}) -> H.264/AAC/CFR ...") + _transcode(path, fps=info["fps"], has_audio=has_audio) + return True + + +def _decodable(path: str) -> bool: + """True if OpenCV can actually read the first frame (last-ditch sanity check).""" + try: + import cv2 + except ImportError: + return True # can't check; assume fine + cap = cv2.VideoCapture(path) + ok, _ = cap.read() + cap.release() + return ok + + +def _transcode(path: str, fps: float | None, has_audio: bool) -> None: + """Transcode in place to H.264 + AAC at a constant frame rate.""" + if not shutil.which("ffmpeg"): + raise RuntimeError("ffmpeg not found; cannot normalize downloaded video") + tmp = path + ".norm.mp4" + # -fps_mode cfr + an explicit -r force a constant frame rate so the engine's + # 1/fps timing stays in sync; yuv420p keeps OpenCV/browsers happy. + rate = f"{fps:.6f}" if fps and fps > 0 else "30" + cmd = ["ffmpeg", "-y", "-i", path, + "-map", "0:v:0", + "-c:v", "libx264", "-preset", "veryfast", "-crf", "23", + "-pix_fmt", "yuv420p", "-r", rate, "-fps_mode", "cfr"] + if has_audio: + cmd += ["-map", "0:a:0", "-c:a", "aac", "-b:a", "128k"] + else: + cmd += ["-an"] + cmd += ["-movflags", "+faststart", "-loglevel", "error", tmp] + try: + res = subprocess.run(cmd, capture_output=True, text=True, + timeout=_ENCODE_TIMEOUT) + except subprocess.TimeoutExpired: + _unlink(tmp) + raise RuntimeError(f"normalize timed out after {_ENCODE_TIMEOUT}s") + if res.returncode != 0 or not os.path.exists(tmp): + _unlink(tmp) + raise RuntimeError(f"normalize failed: {res.stderr.strip()[-300:]}") + os.replace(tmp, path)