From 7fa761212e043f37c1de8ee219396ea967e1d7ed Mon Sep 17 00:00:00 2001 From: oneshot2001 Date: Sat, 13 Jun 2026 13:01:27 -0600 Subject: [PATCH] fix: top-3 review findings (decoder ordering, public-by-default, per-session state) Frontend (app.js): - Serialize the stateful adaptive-codec decoder through a promise chain. decode() awaits a real async DecompressionStream, so the previous concurrent .then() let a small DELTA resolve before its keyframe and patch a stale/null prev -> corrupt frames. Adds .catch + stale-decoder guard so a re-INIT drops in-flight frames from the previous segment. - Flush frameBuffer on INIT so playlist transitions don't stall the reset master clock on the previous video's tail frames (or render them under the new renderer on a mode change). - Request /audio?v= using the new INIT queue-index field so audio is correct when multiple clients are connected. Server (stream_server.py): - Bind 127.0.0.1 by default (--host to opt into LAN); same-origin Origin check before streaming (CSWSH defense that still allows LAN same-origin). - Scope /static to an app.js/style.css/codec.js whitelist (was serving the whole repo: source, playlist, any local .env/notes). - Per-session audio: INIT carries the queue index; /audio?v= reads it (bounds-checked) instead of the shared global current_index. - Validate/clamp playlist+CLI mode/vol/pixel/cols/rows; guard malformed playlist JSON. ffmpeg gets -nostdin + terminate/kill-with-timeout. - Re-enable WS keepalive (reap dead clients); release VideoCapture on the isOpened()-false path. Adds experiments/test_decode_order.js: dependency-free regression proving serialized decode is bit-exact + in-order and that delta-before-keyframe throws (no video fixtures needed). Server fixes built by Codex from a Claude spec; Claude integrated + reviewed (tightened the Origin check to same-origin so --host 0.0.0.0 LAN mode works). Co-Authored-By: Claude Opus 4.8 (1M context) --- app.js | 31 +++++- experiments/test_decode_order.js | 111 +++++++++++++++++++++ stream_server.py | 162 ++++++++++++++++++++++++------- 3 files changed, 266 insertions(+), 38 deletions(-) create mode 100644 experiments/test_decode_order.js diff --git a/app.js b/app.js index b02b03e..eee9ac5 100644 --- a/app.js +++ b/app.js @@ -21,6 +21,7 @@ let ws = null; const frameBuffer = []; const BUFFER_SIZE = 4; let codecDecoder = null; // Adaptive codec decoder (codec.js) +let decodeChain = Promise.resolve(); // serializes stateful codec decodes in arrival order let targetFps = 24; let frameInterval = 1000 / targetFps; let renderMode = 1; @@ -165,6 +166,9 @@ function connectWebSocket() { frameInterval = 1000 / targetFps; renderMode = parseInt(p[2]); pixelMode = (p.length > 5 && parseInt(p[5]) === 1); + // 7th field (added server-side) = current queue index, so /audio + // serves THIS client's video even when other clients are connected. + const videoIndex = (p.length > 6 && !Number.isNaN(parseInt(p[6]))) ? parseInt(p[6]) : 0; buildCanvas(parseInt(p[3]), parseInt(p[4])); // Initialize adaptive codec decoder (pixel=3 bytes, ASCII color=4 bytes) @@ -174,6 +178,13 @@ function connectWebSocket() { codecDecoder = null; } + // New segment (single video, or the next entry in a playlist): + // drop frames still buffered from the previous segment. Their + // timestamps belong to the old master clock (which resets when the + // audio reloads below), so keeping them stalls A/V sync — and on a + // mode change they'd be decoded under the wrong renderer. + frameBuffer.length = 0; + // ── AUDIO READY GATE ── // Buffer video frames but don't render until audio is ready. // This prevents the 0.5s initial stutter. @@ -190,7 +201,7 @@ function connectWebSocket() { if (audioEl) { audioEl.pause(); - audioEl.src = '/audio?' + Date.now(); + audioEl.src = '/audio?v=' + videoIndex + '&t=' + Date.now(); audioEl.volume = volumeSlider ? volumeSlider.value : 1.0; audioEl.load(); audioEl.play().catch(() => {}); @@ -222,10 +233,20 @@ function connectWebSocket() { } else { // Binary Frames — decoded via adaptive codec (raw/zlib/delta) if (codecDecoder) { - codecDecoder.decode(event.data).then(({ frameIndex, frame }) => { - const frameTime = frameIndex / targetFps; - frameBuffer.push({ data: frame, time: frameTime }); - }); + // The codec is STATEFUL: a DELTA frame patches the previously + // decoded frame, so decodes MUST run in arrival order. decode() + // awaits a real async DecompressionStream, so firing them + // concurrently lets a small DELTA resolve before an earlier + // keyframe and patch a stale frame. Serialize through a chain. + const data = event.data; + const dec = codecDecoder; + decodeChain = decodeChain.then(async () => { + if (dec !== codecDecoder) return; // stream re-INIT'd → drop stale frame + const { frameIndex, frame } = await dec.decode(data); + frameBuffer.push({ data: frame, time: frameIndex / targetFps }); + // Cap here (not only in the sync path) since this push is async. + while (frameBuffer.length > BUFFER_SIZE * 5) frameBuffer.shift(); + }).catch((err) => { console.error('ASCILINE decode error:', err); }); } else { // Fallback: legacy 4-byte header const buffer = event.data; diff --git a/experiments/test_decode_order.js b/experiments/test_decode_order.js new file mode 100644 index 0000000..1a7eebe --- /dev/null +++ b/experiments/test_decode_order.js @@ -0,0 +1,111 @@ +/** + * Codec decode-order regression test — dependency-free (no video fixtures). + * + * Encodes a keyframe + DELTA stream in the exact ASCILINE wire format using + * Node's zlib (RFC1950, the same format Python's zlib.compress emits), then: + * 1. decodes it through a SERIALIZED chain (the shipped app.js pattern) and + * asserts every frame is byte-exact AND in order, and + * 2. shows a DELTA decoded before its keyframe throws — i.e. arrival order is + * load-bearing, which is exactly why the decoder must be serialized. + * + * Usage: node experiments/test_decode_order.js + */ +const codec = require('../codec.js'); +const zlib = require('zlib'); + +const ROWS = 16, COLS = 16, C = 4, CELLS = ROWS * COLS, FB = CELLS * C; +const N = 12, KEYFRAME_INTERVAL = 48; + +function be32(n) { const b = Buffer.alloc(4); b.writeUInt32BE(n >>> 0, 0); return b; } +function le32(n) { const b = Buffer.alloc(4); b.writeUInt32LE(n >>> 0, 0); return b; } +function ab(buf) { return buf.buffer.slice(buf.byteOffset, buf.byteOffset + buf.byteLength); } + +// Deterministic frames: a varied (poorly-compressible) frame 0, then a few +// changed cells per frame so DELTA decisively beats full-frame zlib. +function makeFrames() { + const frames = []; + const f0 = new Uint8Array(FB); + for (let i = 0; i < FB; i++) f0[i] = (i * 7 + (i >> 2) * 13) & 0xff; + frames.push(f0); + for (let i = 1; i < N; i++) { + const nf = frames[i - 1].slice(); + for (let j = 0; j < 2; j++) { + const cell = (i * 5 + j * 37) % CELLS; + for (let b = 0; b < C; b++) nf[cell * C + b] = (i * 31 + cell * 17 + b * 11) & 0xff; + } + frames.push(nf); + } + return frames; +} + +// Mirror of codec.py's encoder (lossless): smallest of RAW / ZLIB / DELTA. +function encode(frames) { + const msgs = []; let prev = null; + for (let i = 0; i < frames.length; i++) { + const raw = Buffer.from(frames[i]); + if (prev === null || i % KEYFRAME_INTERVAL === 0) { + const z = zlib.deflateSync(raw); + const tag = z.length < raw.length ? 1 : 0; + msgs.push(Buffer.concat([be32(i), Buffer.from([tag]), tag === 1 ? z : raw])); + prev = frames[i]; continue; + } + const idxs = []; + for (let c = 0; c < CELLS; c++) { + for (let b = 0; b < C; b++) { + if (frames[i][c * C + b] !== prev[c * C + b]) { idxs.push(c); break; } + } + } + const body = Buffer.concat([ + ...idxs.map(le32), + ...idxs.map(c => Buffer.from(frames[i].slice(c * C, c * C + C))), + ]); + const dz = zlib.deflateSync(body); + const fz = zlib.deflateSync(raw); + let tag, payload; + if (dz.length <= fz.length) { tag = 2; payload = dz; } else { tag = 1; payload = fz; } + if (raw.length < payload.length) { tag = 0; payload = raw; } + msgs.push(Buffer.concat([be32(i), Buffer.from([tag]), payload])); + prev = frames[i]; + } + return msgs; +} + +async function decodeSerial(msgs, cellBytes) { + const dec = codec.makeDecoder(cellBytes); + const out = []; + let chain = Promise.resolve(); + for (const m of msgs) { + chain = chain.then(async () => { + const { frameIndex, frame } = await dec.decode(ab(m)); + out.push({ frameIndex, frame }); + }); + } + await chain; + return out; +} + +(async () => { + const frames = makeFrames(); + const msgs = encode(frames); + const tags = msgs.map(m => m[4]).join(''); + console.log(`frames: ${frames.length} tags: ${tags} (0=RAW 1=ZLIB 2=DELTA)`); + if (!tags.includes('2')) { console.error('FAIL: no DELTA frames produced — test is not exercising the stateful path'); process.exit(1); } + + const out = await decodeSerial(msgs, C); + let bad = 0; + for (let i = 0; i < frames.length; i++) { + if (out[i].frameIndex !== i) { bad++; console.log(`order FAIL at ${i}: got index ${out[i].frameIndex}`); continue; } + const got = out[i].frame, want = frames[i]; + if (got.length !== want.length) { bad++; console.log(`len FAIL frame ${i}`); continue; } + for (let j = 0; j < want.length; j++) if (got[j] !== want[j]) { bad++; console.log(`byte FAIL frame ${i} @ ${j}`); break; } + } + console.log(`serialized decode : ${bad === 0 ? 'PASS (bit-exact + in-order)' : 'FAIL (' + bad + ')'}`); + + // A DELTA decoded with no prior keyframe must fail — proving order matters. + let threw = false; + try { await codec.makeDecoder(C).decode(ab(msgs.find(m => m[4] === 2))); } + catch { threw = true; } + console.log(`order load-bearing: ${threw ? 'YES — delta-before-keyframe throws (this is what the concurrency bug caused)' : 'NO'}`); + + process.exit(bad === 0 && threw ? 0 : 1); +})().catch(e => { console.error('ERROR', e); process.exit(2); }); diff --git a/stream_server.py b/stream_server.py index 55d9184..e17274d 100644 --- a/stream_server.py +++ b/stream_server.py @@ -15,11 +15,11 @@ import subprocess import json import numpy as np import cv2 -from fastapi import FastAPI, WebSocket, WebSocketDisconnect -from fastapi.responses import HTMLResponse, StreamingResponse -from fastapi.staticfiles import StaticFiles +from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect +from fastapi.responses import FileResponse, HTMLResponse, StreamingResponse import uvicorn import os +from urllib.parse import urlparse from websockets.exceptions import ConnectionClosed # Import the existing engine (ascii_video_player2.py) @@ -32,12 +32,14 @@ app = FastAPI() def get_video_dimensions(path: str) -> tuple[int, int]: """Quickly probe a video file to get (width, height) without decoding frames.""" cap = cv2.VideoCapture(path) - if not cap.isOpened(): - raise FileNotFoundError(f"Could not open video file: {path!r}") - w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) - h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) - cap.release() - return w, h + try: + if not cap.isOpened(): + raise FileNotFoundError(f"Could not open video file: {path!r}") + w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + return w, h + finally: + cap.release() def calc_auto_rows(cols: int, vid_w: int, vid_h: int, pixel_mode: bool) -> int: @@ -52,9 +54,18 @@ def calc_auto_rows(cols: int, vid_w: int, vid_h: int, pixel_mode: bool) -> int: else: return max(1, round(cols / ratio / 2)) -# Serve static files (style.css, app.js) from the project directory BASE_DIR = os.path.dirname(os.path.abspath(__file__)) -app.mount("/static", StaticFiles(directory=BASE_DIR), name="static") +STATIC_FILES = {"app.js", "style.css", "codec.js"} + + +@app.get("/static/{filename}") +async def static_file(filename: str): + if filename not in STATIC_FILES: + raise HTTPException(status_code=404) + path = os.path.join(BASE_DIR, filename) + if not os.path.isfile(path): + raise HTTPException(status_code=404) + return FileResponse(path) def get_html_content(): html_path = os.path.join(os.path.dirname(__file__), "index.html") @@ -79,11 +90,85 @@ def resolve_video_path(video: str) -> str: return path return video # Return original; error will be caught during playback +def _coerce_int(value, default: int) -> int: + if isinstance(value, bool): + return default + try: + return int(value) + except (TypeError, ValueError): + return default + +def _coerce_bool(value, default: bool) -> bool: + if isinstance(value, bool): + return value + if isinstance(value, str): + value = value.strip().lower() + if value in ("1", "true", "yes", "on"): + return True + if value in ("0", "false", "no", "off"): + return False + return default + if isinstance(value, (int, float)): + return bool(value) + return default + +def normalize_queue_entry( + entry: dict, + default_mode: int, + default_vol: int, + default_pixel: bool, + default_cols: int, + default_rows: int, +) -> dict: + """Normalizes per-video settings before playback allocates buffers.""" + mode = _coerce_int(entry.get("mode", default_mode), 1) + if mode not in {1, 2, 3, 4, 5}: + mode = 1 + + vol = max(0, min(5, _coerce_int(entry.get("vol", default_vol), default_vol))) + pixel = _coerce_bool(entry.get("pixel", default_pixel), default_pixel) + cols = max(1, min(1000, _coerce_int(entry.get("cols", default_cols), default_cols))) + rows = max(0, min(1000, _coerce_int(entry.get("rows", default_rows), default_rows))) + + entry["mode"] = mode + entry["vol"] = vol + entry["pixel"] = pixel + entry["cols"] = cols + entry["rows"] = rows + return entry + +def _origin_allowed(origin: str | None, host_header: str | None = None) -> bool: + if not origin: + return True # non-browser clients / test harness send no Origin + try: + origin_host = urlparse(origin).hostname + except ValueError: + return False + if origin_host in {"localhost", "127.0.0.1"}: + return True + # Same-origin: the page was served by THIS server. Covers LAN mode + # (--host 0.0.0.0), where the Origin host is the server's own LAN IP, not + # localhost — while still rejecting a cross-site page whose Origin won't + # match the Host the victim's browser connected to. + if host_header and origin_host == host_header.split(":")[0]: + return True + return False + def load_playlist(playlist_path: str) -> list[dict]: """Loads playlist from a JSON file and resolves all video paths.""" - with open(playlist_path, "r", encoding="utf-8") as f: - items = json.load(f) - for item in items: + try: + with open(playlist_path, "r", encoding="utf-8") as f: + items = json.load(f) + except (OSError, json.JSONDecodeError) as exc: + print(f"[ERROR] Could not load playlist {playlist_path!r}: {exc}") + exit(1) + if not isinstance(items, list): + print("[ERROR] Playlist must be a JSON list of entries.") + exit(1) + for i, item in enumerate(items, 1): + if not isinstance(item, dict) or not isinstance(item.get("video"), str) or not item.get("video"): + print(f"[ERROR] Playlist entry {i} is missing a valid 'video' field.") + exit(1) item["video"] = resolve_video_path(item["video"]) return items @@ -115,16 +200,10 @@ def build_queue(args) -> list[dict]: if args.playlist: print(f"[PLAYLIST] Loading: {args.playlist}") items = load_playlist(args.playlist) - # Fill missing fields with global defaults for item in items: - item.setdefault("mode", args.mode) - item.setdefault("vol", args.vol) - item.setdefault("pixel", args.pixel) - - is_pixel = item.get("pixel", False) + is_pixel = _coerce_bool(item.get("pixel", args.pixel), args.pixel) default_cols = args.cols if args.cols is not None else (450 if is_pixel else 200) - item.setdefault("cols", default_cols) - item.setdefault("rows", args.rows) + normalize_queue_entry(item, args.mode, args.vol, args.pixel, default_cols, args.rows) return items if args.folder: @@ -132,14 +211,20 @@ def build_queue(args) -> list[dict]: items = load_folder(args.folder, args.mode, args.vol) default_cols = args.cols if args.cols is not None else (450 if args.pixel else 200) for item in items: - item["pixel"] = args.pixel - item["cols"] = default_cols - item["rows"] = args.rows + normalize_queue_entry(item, args.mode, args.vol, args.pixel, default_cols, args.rows) return items # Legacy: single video argument default_cols = args.cols if args.cols is not None else (450 if args.pixel else 200) - return [{"video": resolve_video_path(args.video), "mode": args.mode, "vol": args.vol, "pixel": args.pixel, "cols": default_cols, "rows": args.rows}] + entry = { + "video": resolve_video_path(args.video), + "mode": args.mode, + "vol": args.vol, + "pixel": args.pixel, + "cols": default_cols, + "rows": args.rows, + } + return [normalize_queue_entry(entry, args.mode, args.vol, args.pixel, default_cols, args.rows)] # ── APP STATE ────────────────────────────────────────────── @@ -155,7 +240,7 @@ async def root(): @app.get("/audio") -async def audio_stream(): +async def audio_stream(v: int | None = None): """ Extracts and streams audio from the currently active video entry. Server-side volume control via the entry's 'vol' field (0-5 scale). @@ -165,7 +250,9 @@ async def audio_stream(): """ queue = getattr(app.state, "queue", []) idx = getattr(app.state, "current_index", 0) - entry = queue[idx] if queue else {} + if v is not None and 0 <= v < len(queue): + idx = v + entry = queue[idx] if queue and 0 <= idx < len(queue) else {} vol_level = entry.get("vol", 1) video_path = entry.get("video", "video.mp4") @@ -186,6 +273,7 @@ async def audio_stream(): process = subprocess.Popen( [ "ffmpeg", + "-nostdin", "-i", video_path, "-vn", "-filter:a", f"volume={ffmpeg_vol}", @@ -207,7 +295,12 @@ async def audio_stream(): yield chunk finally: process.stdout.close() - process.wait() + try: + process.terminate() + process.wait(timeout=5) + except subprocess.TimeoutExpired: + process.kill() + process.wait() return StreamingResponse( audio_generator(), @@ -229,6 +322,10 @@ async def websocket_endpoint(websocket: WebSocket): # the original uncompressed binary protocol, byte-for-byte unchanged. adaptive = websocket.query_params.get("codec") == "adaptive" tolerance = getattr(app.state, "tolerance", 0) # lossy colour drift budget + origin = websocket.headers.get("origin") + if not _origin_allowed(origin, websocket.headers.get("host")): + await websocket.close(code=1008) + return queue = getattr(app.state, "queue", []) loop = getattr(app.state, "loop", False) @@ -305,7 +402,7 @@ async def websocket_endpoint(websocket: WebSocket): effective_fps = source_fps frame_t = 1.0 / effective_fps - await websocket.send_text(f"INIT:{effective_fps}:{render_mode}:{cols}:{rows}:{int(pixel_mode)}") + await websocket.send_text(f"INIT:{effective_fps}:{render_mode}:{cols}:{rows}:{int(pixel_mode)}:{queue_index}") if skip_n > 1: print(f"[FPS CAP] {source_fps} FPS → {effective_fps} FPS (skip every {skip_n} frames)") @@ -587,6 +684,7 @@ if __name__ == "__main__": # ── Server ── srv = parser.add_argument_group('\033[33mServer\033[0m') + srv.add_argument("--host", default="127.0.0.1", help="Bind address (default 127.0.0.1; use 0.0.0.0 to expose on LAN)") srv.add_argument("--port", type=int, default=8000, help="Server port (default: 8000)") srv.add_argument("--debug", action="store_true", default=False, help="Enable bandwidth debug logging (RAW vs WIRE)") @@ -661,10 +759,8 @@ if __name__ == "__main__": target=uvicorn.run, args=(app,), kwargs={ - "host": "0.0.0.0", + "host": args.host, "port": args.port, - "ws_ping_interval": None, - "ws_ping_timeout": None, "log_level": "warning", }, daemon=True