feat(#5): play YouTube/yt-dlp URLs and playlists, rebased on new pipeline

Pass any yt-dlp-supported URL where a video file goes. A single video is
downloaded (<=480p, ASCILINE only needs a tiny grid), normalized to
H.264/AAC/CFR mp4, and cached in videos/ by id so replays and --loop are
instant. A playlist/channel URL expands into one queue entry per video,
each fetched lazily as it plays so a long playlist starts immediately.

Also fixes the playlist.json eager-download bug Yusuf reported: URL entries
in a JSON playlist are now left unresolved by load_playlist and fetched
on demand by the playback loop, instead of synchronously downloading every
link before the server starts.

Ported onto the new thread-pool/zero-copy main; all integration points
(resolve_video_path, load_playlist, build_queue, websocket loop) updated.
This commit is contained in:
Nate 2026-06-19 14:16:46 -04:00
parent 51efdaf39f
commit bfbf94637b
6 changed files with 615 additions and 4 deletions

View file

@ -91,6 +91,13 @@ cd ASCILINE
```bash
pip install fastapi uvicorn opencv-python numpy websockets
```
**Optional — play from YouTube (and other yt-dlp sites):**
```bash
pip install yt-dlp
```
Only needed if you pass a URL instead of a local file. Local playback works
without it. URL playback also uses FFmpeg (see below) to normalize downloads.
### 🔈 Audio Support (FFmpeg Required)
To enable server-side audio processing (Volume 1-5), you must have FFmpeg installed.
@ -111,6 +118,19 @@ If you get a `FileNotFoundError` or don't want to modify system variables:
python stream_server.py video.mp4 --cols 240
```
**YouTube / URL (requires `yt-dlp`):** pass any yt-dlp-supported URL in place of a file.
```bash
python stream_server.py "https://youtu.be/VIDEO_ID" --cols 240
python stream_server.py "https://www.youtube.com/playlist?list=..." --cols 220 --loop
```
A single video is downloaded (≤480p — ASCILINE only needs a tiny grid) and cached
in `videos/` by id, so replays are instant. A playlist/channel URL expands into one
queue entry per video, each fetched on demand as it plays. A `playlist.json` may
also list URLs; those are fetched lazily too, so the server starts immediately
instead of downloading the whole list up front. Every download is normalized to a
standard H.264/AAC/constant-frame-rate mp4 so playback and audio stay reliable
regardless of the source codec.
**Folder mode — drop your videos into `videos/` and run:**
```bash
python stream_server.py --folder videos --cols 200

View file

@ -26,6 +26,7 @@ from websockets.exceptions import ConnectionClosed
# Import the existing engine (ascii_video_player2.py)
from ascii_video_player2 import VideoDecoder, AsciiMapper
from codec import encode_frame
import ytdl
app = FastAPI()
@ -73,11 +74,15 @@ def get_html_content():
def resolve_video_path(video: str) -> str:
"""
Resolves a video path by checking multiple locations in order:
0. If it's a URL (YouTube, etc.) -> download via yt-dlp and use that file
1. As-is (absolute or relative to CWD)
2. Inside the project root (BASE_DIR)
3. Inside BASE_DIR/videos/ subfolder
Returns the first path that exists, or the original string if none found.
"""
if ytdl.is_url(video):
return ytdl.download(video, cache_dir=os.path.join(BASE_DIR, "videos"))
candidates = [
video,
os.path.join(BASE_DIR, video),
@ -89,11 +94,19 @@ def resolve_video_path(video: str) -> str:
return video # Return original; error will be caught during playback
def load_playlist(playlist_path: str) -> list[dict]:
"""Loads playlist from a JSON file and resolves all video paths."""
"""
Loads a playlist from a JSON file and resolves local video paths.
URL entries (YouTube, etc.) are left unresolved on purpose: resolving a
URL means downloading it, and eagerly downloading every link would block
startup until the whole playlist is on disk. Unresolved URLs are fetched
lazily by the playback loop the first time each one is about to play.
"""
with open(playlist_path, "r", encoding="utf-8") as f:
items = json.load(f)
for item in items:
item["video"] = resolve_video_path(item["video"])
if not ytdl.is_url(item["video"]):
item["video"] = resolve_video_path(item["video"])
return items
def load_folder(folder_path: str, default_mode: int, default_vol: int) -> list[dict]:
@ -146,9 +159,23 @@ def build_queue(args) -> list[dict]:
item["rows"] = args.rows
return items
# Legacy: single video argument
# Single positional argument: a local file/path, or a URL.
# A URL may be a playlist/channel → expand it into one entry per video.
default_cols = args.cols if args.cols is not None else (450 if args.pixel else 200)
return [{"video": resolve_video_path(args.video), "mode": args.mode, "vol": args.vol, "pixel": args.pixel, "cols": default_cols, "rows": args.rows}]
base = {"mode": args.mode, "vol": args.vol, "pixel": args.pixel,
"cols": default_cols, "rows": args.rows}
if ytdl.is_url(args.video):
urls = ytdl.expand_playlist(args.video)
if len(urls) > 1:
print(f"[YT] playlist expanded → {len(urls)} videos "
f"(each downloaded on demand as it plays)")
# Keep URLs unresolved; the playback loop downloads each lazily so a
# long playlist doesn't block startup, and the cache makes replays
# (and --loop) instant.
return [{"video": u, **base} for u in urls]
return [{"video": resolve_video_path(args.video), **base}]
# ── APP STATE ──────────────────────────────────────────────
@ -386,6 +413,27 @@ async def websocket_endpoint(websocket: WebSocket):
while True:
entry = queue[queue_index]
video_path = entry["video"]
# Lazy resolve: an unresolved URL entry (a single URL, an expanded
# playlist item, or a URL from a playlist.json) is downloaded the
# first time it is about to play, then the local path is cached back
# into the queue so /audio and any --loop replay reuse the file
# instead of re-downloading.
if ytdl.is_url(video_path):
print(f"[YT] fetching ({queue_index + 1}/{len(queue)}) {video_path}")
try:
video_path = resolve_video_path(video_path)
entry["video"] = video_path
except Exception as e:
await websocket.send_text(f"Error: could not fetch '{video_path}': {e}")
queue_index += 1
if queue_index >= len(queue):
if loop:
queue_index = 0
else:
break
continue
render_mode= entry["mode"]
pixel_mode = entry.get("pixel", False)
cols = entry.get("cols", 200)

63
test_ytdl.py Normal file
View file

@ -0,0 +1,63 @@
"""Unit tests for ytdl.expand_playlist parsing (no network — _ytdlp is mocked)."""
import json
import subprocess
from unittest import mock
import ytdl
def _cp(stdout="", returncode=0, stderr=""):
return subprocess.CompletedProcess(args=[], returncode=returncode,
stdout=stdout, stderr=stderr)
def test_expand_playlist_returns_each_entry_url():
info = {
"_type": "playlist",
"entries": [
{"id": "aaa", "url": "https://www.youtube.com/watch?v=aaa"},
{"id": "bbb", "url": "https://www.youtube.com/watch?v=bbb"},
],
}
with mock.patch.object(ytdl, "_ytdlp", return_value=_cp(json.dumps(info))):
assert ytdl.expand_playlist("https://youtube.com/playlist?list=PL") == [
"https://www.youtube.com/watch?v=aaa",
"https://www.youtube.com/watch?v=bbb",
]
def test_expand_playlist_builds_url_from_bare_id():
# Some yt-dlp versions emit a bare id in 'url' for flat entries.
info = {"_type": "playlist", "entries": [{"id": "xyz", "url": "xyz"}]}
with mock.patch.object(ytdl, "_ytdlp", return_value=_cp(json.dumps(info))):
assert ytdl.expand_playlist("https://youtube.com/playlist?list=PL") == [
"https://www.youtube.com/watch?v=xyz",
]
def test_expand_playlist_single_video_returns_input_unchanged():
info = {"_type": "video", "id": "single"}
url = "https://youtu.be/single"
with mock.patch.object(ytdl, "_ytdlp", return_value=_cp(json.dumps(info))):
assert ytdl.expand_playlist(url) == [url]
def test_expand_playlist_falls_back_on_ytdlp_failure():
url = "https://youtu.be/whatever"
with mock.patch.object(ytdl, "_ytdlp", return_value=_cp("", returncode=1, stderr="boom")):
assert ytdl.expand_playlist(url) == [url]
def test_expand_playlist_falls_back_on_bad_json():
url = "https://youtu.be/whatever"
with mock.patch.object(ytdl, "_ytdlp", return_value=_cp("not json{")):
assert ytdl.expand_playlist(url) == [url]
def test_expand_playlist_skips_unusable_entries():
info = {"_type": "playlist", "entries": [{"title": "no id or url"},
{"id": "ok"}]}
with mock.patch.object(ytdl, "_ytdlp", return_value=_cp(json.dumps(info))):
assert ytdl.expand_playlist("https://youtube.com/playlist?list=PL") == [
"https://www.youtube.com/watch?v=ok",
]

80
test_ytdl_hardening.py Normal file
View file

@ -0,0 +1,80 @@
"""
Hardening tests for ytdl: missing dependency, livestreams, atomic caching,
and audio-only sources. All but one are offline (yt-dlp/ffprobe are mocked).
"""
import shutil
import subprocess
from unittest import mock
import pytest
import ytdl
def _cp(stdout="", returncode=0, stderr=""):
return subprocess.CompletedProcess(args=[], returncode=returncode,
stdout=stdout, stderr=stderr)
def test_missing_ytdlp_gives_actionable_error(tmp_path):
with mock.patch("importlib.util.find_spec", return_value=None):
with pytest.raises(RuntimeError, match="pip install yt-dlp"):
ytdl.download("https://youtu.be/x", cache_dir=str(tmp_path))
def test_download_rejects_livestream(tmp_path):
# _probe_remote sees id on line 1, is_live=True on line 2.
with mock.patch("importlib.util.find_spec", return_value=object()), \
mock.patch.object(ytdl, "_ytdlp", return_value=_cp("vid123\nTrue\n")):
with pytest.raises(RuntimeError, match="live stream"):
ytdl.download("https://youtu.be/live", cache_dir=str(tmp_path))
def test_download_is_atomic_on_normalize_failure(tmp_path):
"""A failed normalize must leave no cache file a later run would trust."""
out = tmp_path / "vid123.mp4"
def fake_ytdlp(*args, **kwargs):
if "is_live" in args: # _probe_remote
return _cp("vid123\nFalse\n")
if "-o" in args: # the download itself
target = args[args.index("-o") + 1]
with open(target, "wb") as f: # simulate a downloaded file
f.write(b"\x00\x00")
return _cp("ok")
return _cp("")
with mock.patch("importlib.util.find_spec", return_value=object()), \
mock.patch.object(ytdl, "_ytdlp", side_effect=fake_ytdlp), \
mock.patch.object(ytdl, "normalize", side_effect=RuntimeError("boom")):
with pytest.raises(RuntimeError, match="boom"):
ytdl.download("https://youtu.be/x", cache_dir=str(tmp_path))
assert not out.exists() # no poisoned cache
assert not (tmp_path / "vid123.mp4.part.mp4").exists() # temp cleaned up
def test_cached_file_short_circuits_without_download(tmp_path):
out = tmp_path / "vid123.mp4"
out.write_bytes(b"already here")
def fake_ytdlp(*args, **kwargs):
if "is_live" in args:
return _cp("vid123\nFalse\n")
raise AssertionError("must not download when cached")
with mock.patch("importlib.util.find_spec", return_value=object()), \
mock.patch.object(ytdl, "_ytdlp", side_effect=fake_ytdlp):
assert ytdl.download("https://youtu.be/x", cache_dir=str(tmp_path)) == str(out)
@pytest.mark.skipif(not shutil.which("ffmpeg"), reason="ffmpeg required")
def test_normalize_rejects_audio_only(tmp_path):
audio = tmp_path / "audio_only.mp4"
r = subprocess.run(
["ffmpeg", "-y", "-f", "lavfi", "-i", "sine=frequency=440:duration=1",
"-c:a", "aac", "-loglevel", "error", str(audio)],
capture_output=True, text=True)
assert r.returncode == 0, r.stderr
with pytest.raises(RuntimeError, match="no video stream"):
ytdl.normalize(str(audio))

113
test_ytdl_normalize.py Normal file
View file

@ -0,0 +1,113 @@
"""
Deterministic tests for ytdl.normalize() no network required.
We synthesize the exact kinds of files YouTube serves that broke the engine
(VP9 video + Opus audio inside an mp4 container, and variable-frame-rate video)
and assert that normalize() turns them into canonical H.264 / AAC / CFR mp4s the
engine can open and time correctly.
"""
import shutil
import subprocess
import cv2
import pytest
import ytdl
def _run(*args):
return subprocess.run(args, capture_output=True, text=True)
def _has_encoders(*names):
"""True only if ffmpeg exists and lists every requested encoder."""
if not shutil.which("ffmpeg"):
return False
out = _run("ffmpeg", "-hide_banner", "-encoders").stdout
return all(name in out for name in names)
# Building the *broken* inputs needs these encoders; the fix itself only needs
# libx264/aac. Skip cleanly on a minimal ffmpeg instead of failing CI.
requires_vp9_opus = pytest.mark.skipif(
not _has_encoders("libvpx-vp9", "libopus"),
reason="ffmpeg without libvpx-vp9/libopus; cannot synthesize the broken input")
requires_x264 = pytest.mark.skipif(
not _has_encoders("libx264"), reason="ffmpeg without libx264")
def _make_vp9_opus_mp4(path):
"""A VP9+Opus stream copied into an mp4 — non-standard, exactly what
`--merge-output-format mp4` produces from YouTube's 'best' streams."""
src = str(path) + ".src.mkv"
_run("ffmpeg", "-y",
"-f", "lavfi", "-i", "testsrc=size=320x240:rate=24:duration=1",
"-f", "lavfi", "-i", "sine=frequency=440:duration=1",
"-c:v", "libvpx-vp9", "-b:v", "200k", "-c:a", "libopus",
"-loglevel", "error", src)
# copy (not re-encode) into mp4 -> opus-in-mp4, the broken container
r = _run("ffmpeg", "-y", "-i", src, "-c", "copy", "-loglevel", "error", str(path))
assert r.returncode == 0, r.stderr
def _make_vfr_mp4(path):
"""An H.264 mp4 whose nominal rate disagrees with its average rate (VFR)."""
r = _run("ffmpeg", "-y",
"-f", "lavfi", "-i", "testsrc=size=320x240:rate=60:duration=1",
"-vf", "select='not(mod(n,3))'", # drop 2 of every 3 frames -> VFR
"-fps_mode", "vfr", "-c:v", "libx264", "-an",
"-loglevel", "error", str(path))
assert r.returncode == 0, r.stderr
def _audio_decodes(path):
r = _run("ffmpeg", "-v", "error", "-i", str(path), "-t", "0.5", "-f", "null", "-")
return r.returncode == 0 and "Invalid data" not in r.stderr
@requires_vp9_opus
def test_vp9_opus_mp4_is_repaired(tmp_path):
bad = tmp_path / "bad.mp4"
_make_vp9_opus_mp4(bad)
info = ytdl._probe(str(bad))
assert info["vcodec"] == "vp9" # confirms we built the broken input
assert info["acodec"] == "opus"
assert ytdl.normalize(str(bad)) is True # it had to be repaired
fixed = ytdl._probe(str(bad))
assert fixed["vcodec"] == "h264"
assert fixed["acodec"] == "aac"
assert fixed["cfr"] is True
assert _audio_decodes(bad) # /audio extraction now works
cap = cv2.VideoCapture(str(bad)) # OpenCV can open + read it
ok, _ = cap.read()
fps = cap.get(cv2.CAP_PROP_FPS)
cap.release()
assert ok
assert abs(fps - 24) < 0.5 # engine sees the real, stable FPS
@requires_x264
def test_vfr_is_made_constant(tmp_path):
bad = tmp_path / "vfr.mp4"
_make_vfr_mp4(bad)
assert ytdl._probe(str(bad))["cfr"] is False
assert ytdl.normalize(str(bad)) is True
assert ytdl._probe(str(bad))["cfr"] is True
@requires_x264
def test_clean_file_is_left_alone(tmp_path):
good = tmp_path / "good.mp4"
r = _run("ffmpeg", "-y",
"-f", "lavfi", "-i", "testsrc=size=320x240:rate=24:duration=1",
"-f", "lavfi", "-i", "sine=frequency=440:duration=1",
"-c:v", "libx264", "-pix_fmt", "yuv420p", "-r", "24",
"-c:a", "aac", "-loglevel", "error", str(good))
assert r.returncode == 0, r.stderr
assert ytdl.normalize(str(good)) is False # fast path: no re-encode

287
ytdl.py Normal file
View file

@ -0,0 +1,287 @@
"""
ytdl.py Resolve YouTube (and other yt-dlp-supported) URLs to a local file
that the ASCILINE engine can ALWAYS open.
ASCILINE downscales every frame to a tiny character grid, so there is no point
pulling high resolution. We cap at <=480p and produce a single mp4 with audio
(the /audio endpoint runs ffmpeg on the same file). Downloads are cached in
videos/ by video id so re-runs are instant.
Robustness contract (why this file is more than a one-liner):
The engine reads frames with OpenCV (cv2.VideoCapture) and extracts audio with
ffmpeg. Both break on the files YouTube actually serves:
* Best-quality audio is often Opus/WebM. Muxed into mp4 it is a NON-STANDARD
file that OpenCV/ffmpeg can choke on -> /audio fails or playback crashes.
* Best-quality video is often VP9/AV1, which OpenCV cannot decode without
hardware support.
* YouTube content is frequently variable-frame-rate (VFR). The engine's whole
timing model assumes a single constant FPS (frame_t = 1/fps), so VFR makes
cv2's CAP_PROP_FPS unreliable and playback drifts / "the FPS count breaks".
So after download we PROBE the file with ffprobe and, unless it is already
H.264 + AAC + constant-frame-rate, we normalize it to exactly that. The engine
is left untouched: it only ever sees clean, canonical mp4s.
"""
import os
import sys
import json
import shutil
import importlib.util
import subprocess
_URL_HINTS = ("http://", "https://", "youtube.com", "youtu.be")
# What the engine can open without surprises.
_OK_VCODEC = "h264"
_OK_ACODEC = "aac"
# Subprocess guards so a stuck source can never hang the server.
_DL_TIMEOUT = 900 # yt-dlp download of a <=480p clip
_PROBE_TIMEOUT = 60 # ffprobe / metadata reads
_ENCODE_TIMEOUT = 1800 # ffmpeg re-encode of a long video
def is_url(s: str) -> bool:
s = s.lower()
return s.startswith(("http://", "https://")) or "youtube.com" in s or "youtu.be" in s
def _entry_url(entry: dict) -> str | None:
"""Best-effort downloadable URL for a single flat-playlist entry."""
url = entry.get("url") or entry.get("webpage_url")
if url and "/" in url: # already a full URL
return url
vid = entry.get("id") or url # bare video id (common for YouTube)
if vid:
return f"https://www.youtube.com/watch?v={vid}"
return None
def expand_playlist(url: str) -> list[str]:
"""
Expand a playlist/channel URL into a list of individual video URLs.
Uses `--flat-playlist` so it only reads the index (no per-video download).
Returns ``[url]`` unchanged for a single video, or if expansion fails for
any reason, so the caller can still attempt a normal single download.
"""
_require_ytdlp()
res = _ytdlp("--flat-playlist", "-J", url, timeout=_PROBE_TIMEOUT)
if res.returncode != 0 or not res.stdout.strip():
return [url]
try:
info = json.loads(res.stdout)
except json.JSONDecodeError:
return [url]
if info.get("_type") != "playlist":
return [url]
urls = [u for u in (_entry_url(e) for e in info.get("entries") or []) if u]
return urls or [url]
def _require_ytdlp() -> None:
"""Fail early with an actionable message instead of a cryptic import error."""
if importlib.util.find_spec("yt_dlp") is None:
raise RuntimeError("yt-dlp is not installed. Install it with: pip install yt-dlp")
def _ytdlp(*args: str, timeout: int = _DL_TIMEOUT) -> subprocess.CompletedProcess:
# Use the running interpreter's yt_dlp so it always matches the venv.
try:
return subprocess.run([sys.executable, "-m", "yt_dlp", *args],
capture_output=True, text=True, timeout=timeout)
except subprocess.TimeoutExpired:
raise RuntimeError(f"yt-dlp timed out after {timeout}s")
def _probe_remote(url: str) -> tuple[str, bool]:
"""Return (video_id, is_live) for a single video URL, without downloading."""
res = _ytdlp("--no-playlist", "--print", "id", "--print", "is_live", url,
timeout=_PROBE_TIMEOUT)
if res.returncode != 0 or not res.stdout.strip():
raise RuntimeError(f"yt-dlp could not read {url!r}: {res.stderr.strip()[:200]}")
lines = res.stdout.strip().splitlines()
video_id = lines[0].strip()
is_live = len(lines) > 1 and lines[1].strip().lower() == "true"
return video_id, is_live
def download(url: str, cache_dir: str = "videos") -> str:
"""Download `url` (<=480p) into cache_dir as a canonical mp4 and return the path."""
_require_ytdlp()
os.makedirs(cache_dir, exist_ok=True)
video_id, is_live = _probe_remote(url)
if is_live:
raise RuntimeError(
f"{url!r} is a live stream; ASCILINE plays finite videos only")
out = os.path.join(cache_dir, f"{video_id}.mp4")
# A file at `out` is only ever created by the atomic rename below, so its
# mere existence guarantees a complete, already-normalized video.
if os.path.exists(out):
print(f"[YT] cached: {out}")
return out
print(f"[YT] downloading {url} (<=480p) ...")
# Bias the selection toward a clean container in the first place: H.264 video
# (avc1) that OpenCV decodes everywhere, paired with AAC audio (mp4a) that is
# standard inside mp4. Each fallback widens what we accept; normalize() below
# repairs whatever the chosen format could not give us cleanly.
fmt = ("bv*[vcodec^=avc1][height<=480]+ba[acodec^=mp4a]/" # avc1 + aac -> clean
"bv*[vcodec^=avc1][height<=480]+ba/" # avc1 + any audio
"b[vcodec^=avc1][height<=480]/" # progressive avc1
"bv*[height<=480]+ba/b[height<=480]/b") # last resort
# Download + normalize into a temp file, then atomically rename. An
# interruption (crash, Ctrl-C, full disk) leaves only the temp file, never a
# half-written `out` that a later run would mistake for a good cache hit.
tmp = out + ".part.mp4"
_unlink(tmp)
try:
res = _ytdlp("--no-playlist", "-f", fmt,
"--merge-output-format", "mp4", "-o", tmp, url)
if res.returncode != 0 or not os.path.exists(tmp):
raise RuntimeError(f"yt-dlp download failed: {res.stderr.strip()[-300:]}")
normalize(tmp)
os.replace(tmp, out) # atomic finalize
except BaseException:
_unlink(tmp)
raise
print(f"[YT] saved: {out}")
return out
def _unlink(path: str) -> None:
"""Best-effort remove; never raises (used in cleanup paths)."""
try:
os.remove(path)
except OSError:
pass
# ── format normalization ────────────────────────────────────────────────────
def _probe(path: str) -> dict:
"""
Return {'vcodec','acodec','fps','cfr'} via ffprobe.
fps is the *average* real frame rate (avg_frame_rate); cfr is True only when
the container's nominal rate (r_frame_rate) matches the average, i.e. the
file is genuinely constant-frame-rate and safe for the engine's timing model.
"""
info = {"vcodec": None, "acodec": None, "fps": None, "cfr": False}
if not shutil.which("ffprobe"):
return info
try:
res = subprocess.run(
["ffprobe", "-v", "error", "-show_entries",
"stream=codec_type,codec_name,r_frame_rate,avg_frame_rate",
"-of", "json", path],
capture_output=True, text=True, timeout=_PROBE_TIMEOUT)
except subprocess.TimeoutExpired:
return info
if res.returncode != 0:
return info
try:
streams = json.loads(res.stdout).get("streams", [])
except json.JSONDecodeError:
return info
for st in streams:
if st.get("codec_type") == "video" and info["vcodec"] is None:
info["vcodec"] = st.get("codec_name")
r_fps = _ratio(st.get("r_frame_rate"))
a_fps = _ratio(st.get("avg_frame_rate"))
info["fps"] = a_fps or r_fps
# CFR when both rates are known and agree within rounding.
info["cfr"] = bool(r_fps and a_fps and abs(r_fps - a_fps) < 0.01)
elif st.get("codec_type") == "audio" and info["acodec"] is None:
info["acodec"] = st.get("codec_name")
return info
def _ratio(s: str | None) -> float | None:
"""Parse ffprobe rationals like '30000/1001' -> 29.97; '0/0' -> None."""
if not s or "/" not in s:
return None
num, den = s.split("/", 1)
try:
num_f, den_f = float(num), float(den)
except ValueError:
return None
return num_f / den_f if den_f else None
def normalize(path: str) -> bool:
"""
Ensure `path` is a canonical mp4 the engine can always open:
H.264 video + AAC audio (or no audio) at a constant frame rate.
Fast path: if the file already satisfies the contract, do nothing and return
False. Otherwise transcode in place and return True. Re-encoding is the only
reliable way to repair VP9/AV1 video, Opus-in-mp4 audio, and VFR timing a
plain remux cannot.
"""
info = _probe(path)
if info["vcodec"] is None:
# No decodable video stream — nothing ASCILINE can render.
what = "audio-only source" if info["acodec"] else "unreadable file"
raise RuntimeError(f"{path!r}: no video stream ({what})")
has_audio = info["acodec"] is not None
clean = (info["vcodec"] == _OK_VCODEC
and (not has_audio or info["acodec"] == _OK_ACODEC)
and info["cfr"])
if clean and _decodable(path):
return False
reason = []
if info["vcodec"] != _OK_VCODEC:
reason.append(f"video={info['vcodec']}")
if has_audio and info["acodec"] != _OK_ACODEC:
reason.append(f"audio={info['acodec']}")
if not info["cfr"]:
reason.append("vfr")
print(f"[YT] normalizing ({', '.join(reason) or 'unreadable'}) -> H.264/AAC/CFR ...")
_transcode(path, fps=info["fps"], has_audio=has_audio)
return True
def _decodable(path: str) -> bool:
"""True if OpenCV can actually read the first frame (last-ditch sanity check)."""
try:
import cv2
except ImportError:
return True # can't check; assume fine
cap = cv2.VideoCapture(path)
ok, _ = cap.read()
cap.release()
return ok
def _transcode(path: str, fps: float | None, has_audio: bool) -> None:
"""Transcode in place to H.264 + AAC at a constant frame rate."""
if not shutil.which("ffmpeg"):
raise RuntimeError("ffmpeg not found; cannot normalize downloaded video")
tmp = path + ".norm.mp4"
# -fps_mode cfr + an explicit -r force a constant frame rate so the engine's
# 1/fps timing stays in sync; yuv420p keeps OpenCV/browsers happy.
rate = f"{fps:.6f}" if fps and fps > 0 else "30"
cmd = ["ffmpeg", "-y", "-i", path,
"-map", "0:v:0",
"-c:v", "libx264", "-preset", "veryfast", "-crf", "23",
"-pix_fmt", "yuv420p", "-r", rate, "-fps_mode", "cfr"]
if has_audio:
cmd += ["-map", "0:a:0", "-c:a", "aac", "-b:a", "128k"]
else:
cmd += ["-an"]
cmd += ["-movflags", "+faststart", "-loglevel", "error", tmp]
try:
res = subprocess.run(cmd, capture_output=True, text=True,
timeout=_ENCODE_TIMEOUT)
except subprocess.TimeoutExpired:
_unlink(tmp)
raise RuntimeError(f"normalize timed out after {_ENCODE_TIMEOUT}s")
if res.returncode != 0 or not os.path.exists(tmp):
_unlink(tmp)
raise RuntimeError(f"normalize failed: {res.stderr.strip()[-300:]}")
os.replace(tmp, path)