fix: top-3 review findings (decoder ordering, public-by-default, per-session state)

Frontend (app.js):
- Serialize the stateful adaptive-codec decoder through a promise chain.
  decode() awaits a real async DecompressionStream, so the previous
  concurrent .then() let a small DELTA resolve before its keyframe and
  patch a stale/null prev -> corrupt frames. Adds .catch + stale-decoder
  guard so a re-INIT drops in-flight frames from the previous segment.
- Flush frameBuffer on INIT so playlist transitions don't stall the reset
  master clock on the previous video's tail frames (or render them under
  the new renderer on a mode change).
- Request /audio?v=<idx> using the new INIT queue-index field so audio is
  correct when multiple clients are connected.

Server (stream_server.py):
- Bind 127.0.0.1 by default (--host to opt into LAN); same-origin Origin
  check before streaming (CSWSH defense that still allows LAN same-origin).
- Scope /static to an app.js/style.css/codec.js whitelist (was serving the
  whole repo: source, playlist, any local .env/notes).
- Per-session audio: INIT carries the queue index; /audio?v= reads it
  (bounds-checked) instead of the shared global current_index.
- Validate/clamp playlist+CLI mode/vol/pixel/cols/rows; guard malformed
  playlist JSON. ffmpeg gets -nostdin + terminate/kill-with-timeout.
- Re-enable WS keepalive (reap dead clients); release VideoCapture on the
  isOpened()-false path.

Adds experiments/test_decode_order.js: dependency-free regression proving
serialized decode is bit-exact + in-order and that delta-before-keyframe
throws (no video fixtures needed).

Server fixes built by Codex from a Claude spec; Claude integrated + reviewed
(tightened the Origin check to same-origin so --host 0.0.0.0 LAN mode works).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
oneshot2001 2026-06-13 13:01:27 -06:00
parent 444334cfba
commit 7fa761212e
No known key found for this signature in database
3 changed files with 266 additions and 38 deletions

31
app.js
View file

@ -21,6 +21,7 @@ let ws = null;
const frameBuffer = [];
const BUFFER_SIZE = 4;
let codecDecoder = null; // Adaptive codec decoder (codec.js)
let decodeChain = Promise.resolve(); // serializes stateful codec decodes in arrival order
let targetFps = 24;
let frameInterval = 1000 / targetFps;
let renderMode = 1;
@ -165,6 +166,9 @@ function connectWebSocket() {
frameInterval = 1000 / targetFps;
renderMode = parseInt(p[2]);
pixelMode = (p.length > 5 && parseInt(p[5]) === 1);
// 7th field (added server-side) = current queue index, so /audio
// serves THIS client's video even when other clients are connected.
const videoIndex = (p.length > 6 && !Number.isNaN(parseInt(p[6]))) ? parseInt(p[6]) : 0;
buildCanvas(parseInt(p[3]), parseInt(p[4]));
// Initialize adaptive codec decoder (pixel=3 bytes, ASCII color=4 bytes)
@ -174,6 +178,13 @@ function connectWebSocket() {
codecDecoder = null;
}
// New segment (single video, or the next entry in a playlist):
// drop frames still buffered from the previous segment. Their
// timestamps belong to the old master clock (which resets when the
// audio reloads below), so keeping them stalls A/V sync — and on a
// mode change they'd be decoded under the wrong renderer.
frameBuffer.length = 0;
// ── AUDIO READY GATE ──
// Buffer video frames but don't render until audio is ready.
// This prevents the 0.5s initial stutter.
@ -190,7 +201,7 @@ function connectWebSocket() {
if (audioEl) {
audioEl.pause();
audioEl.src = '/audio?' + Date.now();
audioEl.src = '/audio?v=' + videoIndex + '&t=' + Date.now();
audioEl.volume = volumeSlider ? volumeSlider.value : 1.0;
audioEl.load();
audioEl.play().catch(() => {});
@ -222,10 +233,20 @@ function connectWebSocket() {
} else {
// Binary Frames — decoded via adaptive codec (raw/zlib/delta)
if (codecDecoder) {
codecDecoder.decode(event.data).then(({ frameIndex, frame }) => {
const frameTime = frameIndex / targetFps;
frameBuffer.push({ data: frame, time: frameTime });
});
// The codec is STATEFUL: a DELTA frame patches the previously
// decoded frame, so decodes MUST run in arrival order. decode()
// awaits a real async DecompressionStream, so firing them
// concurrently lets a small DELTA resolve before an earlier
// keyframe and patch a stale frame. Serialize through a chain.
const data = event.data;
const dec = codecDecoder;
decodeChain = decodeChain.then(async () => {
if (dec !== codecDecoder) return; // stream re-INIT'd → drop stale frame
const { frameIndex, frame } = await dec.decode(data);
frameBuffer.push({ data: frame, time: frameIndex / targetFps });
// Cap here (not only in the sync path) since this push is async.
while (frameBuffer.length > BUFFER_SIZE * 5) frameBuffer.shift();
}).catch((err) => { console.error('ASCILINE decode error:', err); });
} else {
// Fallback: legacy 4-byte header
const buffer = event.data;

View file

@ -0,0 +1,111 @@
/**
* Codec decode-order regression test dependency-free (no video fixtures).
*
* Encodes a keyframe + DELTA stream in the exact ASCILINE wire format using
* Node's zlib (RFC1950, the same format Python's zlib.compress emits), then:
* 1. decodes it through a SERIALIZED chain (the shipped app.js pattern) and
* asserts every frame is byte-exact AND in order, and
* 2. shows a DELTA decoded before its keyframe throws i.e. arrival order is
* load-bearing, which is exactly why the decoder must be serialized.
*
* Usage: node experiments/test_decode_order.js
*/
const codec = require('../codec.js');
const zlib = require('zlib');
const ROWS = 16, COLS = 16, C = 4, CELLS = ROWS * COLS, FB = CELLS * C;
const N = 12, KEYFRAME_INTERVAL = 48;
function be32(n) { const b = Buffer.alloc(4); b.writeUInt32BE(n >>> 0, 0); return b; }
function le32(n) { const b = Buffer.alloc(4); b.writeUInt32LE(n >>> 0, 0); return b; }
function ab(buf) { return buf.buffer.slice(buf.byteOffset, buf.byteOffset + buf.byteLength); }
// Deterministic frames: a varied (poorly-compressible) frame 0, then a few
// changed cells per frame so DELTA decisively beats full-frame zlib.
function makeFrames() {
const frames = [];
const f0 = new Uint8Array(FB);
for (let i = 0; i < FB; i++) f0[i] = (i * 7 + (i >> 2) * 13) & 0xff;
frames.push(f0);
for (let i = 1; i < N; i++) {
const nf = frames[i - 1].slice();
for (let j = 0; j < 2; j++) {
const cell = (i * 5 + j * 37) % CELLS;
for (let b = 0; b < C; b++) nf[cell * C + b] = (i * 31 + cell * 17 + b * 11) & 0xff;
}
frames.push(nf);
}
return frames;
}
// Mirror of codec.py's encoder (lossless): smallest of RAW / ZLIB / DELTA.
function encode(frames) {
const msgs = []; let prev = null;
for (let i = 0; i < frames.length; i++) {
const raw = Buffer.from(frames[i]);
if (prev === null || i % KEYFRAME_INTERVAL === 0) {
const z = zlib.deflateSync(raw);
const tag = z.length < raw.length ? 1 : 0;
msgs.push(Buffer.concat([be32(i), Buffer.from([tag]), tag === 1 ? z : raw]));
prev = frames[i]; continue;
}
const idxs = [];
for (let c = 0; c < CELLS; c++) {
for (let b = 0; b < C; b++) {
if (frames[i][c * C + b] !== prev[c * C + b]) { idxs.push(c); break; }
}
}
const body = Buffer.concat([
...idxs.map(le32),
...idxs.map(c => Buffer.from(frames[i].slice(c * C, c * C + C))),
]);
const dz = zlib.deflateSync(body);
const fz = zlib.deflateSync(raw);
let tag, payload;
if (dz.length <= fz.length) { tag = 2; payload = dz; } else { tag = 1; payload = fz; }
if (raw.length < payload.length) { tag = 0; payload = raw; }
msgs.push(Buffer.concat([be32(i), Buffer.from([tag]), payload]));
prev = frames[i];
}
return msgs;
}
async function decodeSerial(msgs, cellBytes) {
const dec = codec.makeDecoder(cellBytes);
const out = [];
let chain = Promise.resolve();
for (const m of msgs) {
chain = chain.then(async () => {
const { frameIndex, frame } = await dec.decode(ab(m));
out.push({ frameIndex, frame });
});
}
await chain;
return out;
}
(async () => {
const frames = makeFrames();
const msgs = encode(frames);
const tags = msgs.map(m => m[4]).join('');
console.log(`frames: ${frames.length} tags: ${tags} (0=RAW 1=ZLIB 2=DELTA)`);
if (!tags.includes('2')) { console.error('FAIL: no DELTA frames produced — test is not exercising the stateful path'); process.exit(1); }
const out = await decodeSerial(msgs, C);
let bad = 0;
for (let i = 0; i < frames.length; i++) {
if (out[i].frameIndex !== i) { bad++; console.log(`order FAIL at ${i}: got index ${out[i].frameIndex}`); continue; }
const got = out[i].frame, want = frames[i];
if (got.length !== want.length) { bad++; console.log(`len FAIL frame ${i}`); continue; }
for (let j = 0; j < want.length; j++) if (got[j] !== want[j]) { bad++; console.log(`byte FAIL frame ${i} @ ${j}`); break; }
}
console.log(`serialized decode : ${bad === 0 ? 'PASS (bit-exact + in-order)' : 'FAIL (' + bad + ')'}`);
// A DELTA decoded with no prior keyframe must fail — proving order matters.
let threw = false;
try { await codec.makeDecoder(C).decode(ab(msgs.find(m => m[4] === 2))); }
catch { threw = true; }
console.log(`order load-bearing: ${threw ? 'YES — delta-before-keyframe throws (this is what the concurrency bug caused)' : 'NO'}`);
process.exit(bad === 0 && threw ? 0 : 1);
})().catch(e => { console.error('ERROR', e); process.exit(2); });

View file

@ -15,11 +15,11 @@ import subprocess
import json
import numpy as np
import cv2
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.responses import FileResponse, HTMLResponse, StreamingResponse
import uvicorn
import os
from urllib.parse import urlparse
from websockets.exceptions import ConnectionClosed
# Import the existing engine (ascii_video_player2.py)
@ -32,12 +32,14 @@ app = FastAPI()
def get_video_dimensions(path: str) -> tuple[int, int]:
"""Quickly probe a video file to get (width, height) without decoding frames."""
cap = cv2.VideoCapture(path)
if not cap.isOpened():
raise FileNotFoundError(f"Could not open video file: {path!r}")
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
cap.release()
return w, h
try:
if not cap.isOpened():
raise FileNotFoundError(f"Could not open video file: {path!r}")
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
return w, h
finally:
cap.release()
def calc_auto_rows(cols: int, vid_w: int, vid_h: int, pixel_mode: bool) -> int:
@ -52,9 +54,18 @@ def calc_auto_rows(cols: int, vid_w: int, vid_h: int, pixel_mode: bool) -> int:
else:
return max(1, round(cols / ratio / 2))
# Serve static files (style.css, app.js) from the project directory
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
app.mount("/static", StaticFiles(directory=BASE_DIR), name="static")
STATIC_FILES = {"app.js", "style.css", "codec.js"}
@app.get("/static/{filename}")
async def static_file(filename: str):
if filename not in STATIC_FILES:
raise HTTPException(status_code=404)
path = os.path.join(BASE_DIR, filename)
if not os.path.isfile(path):
raise HTTPException(status_code=404)
return FileResponse(path)
def get_html_content():
html_path = os.path.join(os.path.dirname(__file__), "index.html")
@ -79,11 +90,85 @@ def resolve_video_path(video: str) -> str:
return path
return video # Return original; error will be caught during playback
def _coerce_int(value, default: int) -> int:
if isinstance(value, bool):
return default
try:
return int(value)
except (TypeError, ValueError):
return default
def _coerce_bool(value, default: bool) -> bool:
if isinstance(value, bool):
return value
if isinstance(value, str):
value = value.strip().lower()
if value in ("1", "true", "yes", "on"):
return True
if value in ("0", "false", "no", "off"):
return False
return default
if isinstance(value, (int, float)):
return bool(value)
return default
def normalize_queue_entry(
entry: dict,
default_mode: int,
default_vol: int,
default_pixel: bool,
default_cols: int,
default_rows: int,
) -> dict:
"""Normalizes per-video settings before playback allocates buffers."""
mode = _coerce_int(entry.get("mode", default_mode), 1)
if mode not in {1, 2, 3, 4, 5}:
mode = 1
vol = max(0, min(5, _coerce_int(entry.get("vol", default_vol), default_vol)))
pixel = _coerce_bool(entry.get("pixel", default_pixel), default_pixel)
cols = max(1, min(1000, _coerce_int(entry.get("cols", default_cols), default_cols)))
rows = max(0, min(1000, _coerce_int(entry.get("rows", default_rows), default_rows)))
entry["mode"] = mode
entry["vol"] = vol
entry["pixel"] = pixel
entry["cols"] = cols
entry["rows"] = rows
return entry
def _origin_allowed(origin: str | None, host_header: str | None = None) -> bool:
if not origin:
return True # non-browser clients / test harness send no Origin
try:
origin_host = urlparse(origin).hostname
except ValueError:
return False
if origin_host in {"localhost", "127.0.0.1"}:
return True
# Same-origin: the page was served by THIS server. Covers LAN mode
# (--host 0.0.0.0), where the Origin host is the server's own LAN IP, not
# localhost — while still rejecting a cross-site page whose Origin won't
# match the Host the victim's browser connected to.
if host_header and origin_host == host_header.split(":")[0]:
return True
return False
def load_playlist(playlist_path: str) -> list[dict]:
"""Loads playlist from a JSON file and resolves all video paths."""
with open(playlist_path, "r", encoding="utf-8") as f:
items = json.load(f)
for item in items:
try:
with open(playlist_path, "r", encoding="utf-8") as f:
items = json.load(f)
except (OSError, json.JSONDecodeError) as exc:
print(f"[ERROR] Could not load playlist {playlist_path!r}: {exc}")
exit(1)
if not isinstance(items, list):
print("[ERROR] Playlist must be a JSON list of entries.")
exit(1)
for i, item in enumerate(items, 1):
if not isinstance(item, dict) or not isinstance(item.get("video"), str) or not item.get("video"):
print(f"[ERROR] Playlist entry {i} is missing a valid 'video' field.")
exit(1)
item["video"] = resolve_video_path(item["video"])
return items
@ -115,16 +200,10 @@ def build_queue(args) -> list[dict]:
if args.playlist:
print(f"[PLAYLIST] Loading: {args.playlist}")
items = load_playlist(args.playlist)
# Fill missing fields with global defaults
for item in items:
item.setdefault("mode", args.mode)
item.setdefault("vol", args.vol)
item.setdefault("pixel", args.pixel)
is_pixel = item.get("pixel", False)
is_pixel = _coerce_bool(item.get("pixel", args.pixel), args.pixel)
default_cols = args.cols if args.cols is not None else (450 if is_pixel else 200)
item.setdefault("cols", default_cols)
item.setdefault("rows", args.rows)
normalize_queue_entry(item, args.mode, args.vol, args.pixel, default_cols, args.rows)
return items
if args.folder:
@ -132,14 +211,20 @@ def build_queue(args) -> list[dict]:
items = load_folder(args.folder, args.mode, args.vol)
default_cols = args.cols if args.cols is not None else (450 if args.pixel else 200)
for item in items:
item["pixel"] = args.pixel
item["cols"] = default_cols
item["rows"] = args.rows
normalize_queue_entry(item, args.mode, args.vol, args.pixel, default_cols, args.rows)
return items
# Legacy: single video argument
default_cols = args.cols if args.cols is not None else (450 if args.pixel else 200)
return [{"video": resolve_video_path(args.video), "mode": args.mode, "vol": args.vol, "pixel": args.pixel, "cols": default_cols, "rows": args.rows}]
entry = {
"video": resolve_video_path(args.video),
"mode": args.mode,
"vol": args.vol,
"pixel": args.pixel,
"cols": default_cols,
"rows": args.rows,
}
return [normalize_queue_entry(entry, args.mode, args.vol, args.pixel, default_cols, args.rows)]
# ── APP STATE ──────────────────────────────────────────────
@ -155,7 +240,7 @@ async def root():
@app.get("/audio")
async def audio_stream():
async def audio_stream(v: int | None = None):
"""
Extracts and streams audio from the currently active video entry.
Server-side volume control via the entry's 'vol' field (0-5 scale).
@ -165,7 +250,9 @@ async def audio_stream():
"""
queue = getattr(app.state, "queue", [])
idx = getattr(app.state, "current_index", 0)
entry = queue[idx] if queue else {}
if v is not None and 0 <= v < len(queue):
idx = v
entry = queue[idx] if queue and 0 <= idx < len(queue) else {}
vol_level = entry.get("vol", 1)
video_path = entry.get("video", "video.mp4")
@ -186,6 +273,7 @@ async def audio_stream():
process = subprocess.Popen(
[
"ffmpeg",
"-nostdin",
"-i", video_path,
"-vn",
"-filter:a", f"volume={ffmpeg_vol}",
@ -207,7 +295,12 @@ async def audio_stream():
yield chunk
finally:
process.stdout.close()
process.wait()
try:
process.terminate()
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
process.wait()
return StreamingResponse(
audio_generator(),
@ -229,6 +322,10 @@ async def websocket_endpoint(websocket: WebSocket):
# the original uncompressed binary protocol, byte-for-byte unchanged.
adaptive = websocket.query_params.get("codec") == "adaptive"
tolerance = getattr(app.state, "tolerance", 0) # lossy colour drift budget
origin = websocket.headers.get("origin")
if not _origin_allowed(origin, websocket.headers.get("host")):
await websocket.close(code=1008)
return
queue = getattr(app.state, "queue", [])
loop = getattr(app.state, "loop", False)
@ -305,7 +402,7 @@ async def websocket_endpoint(websocket: WebSocket):
effective_fps = source_fps
frame_t = 1.0 / effective_fps
await websocket.send_text(f"INIT:{effective_fps}:{render_mode}:{cols}:{rows}:{int(pixel_mode)}")
await websocket.send_text(f"INIT:{effective_fps}:{render_mode}:{cols}:{rows}:{int(pixel_mode)}:{queue_index}")
if skip_n > 1:
print(f"[FPS CAP] {source_fps} FPS → {effective_fps} FPS (skip every {skip_n} frames)")
@ -587,6 +684,7 @@ if __name__ == "__main__":
# ── Server ──
srv = parser.add_argument_group('\033[33mServer\033[0m')
srv.add_argument("--host", default="127.0.0.1", help="Bind address (default 127.0.0.1; use 0.0.0.0 to expose on LAN)")
srv.add_argument("--port", type=int, default=8000, help="Server port (default: 8000)")
srv.add_argument("--debug", action="store_true", default=False, help="Enable bandwidth debug logging (RAW vs WIRE)")
@ -661,10 +759,8 @@ if __name__ == "__main__":
target=uvicorn.run,
args=(app,),
kwargs={
"host": "0.0.0.0",
"host": args.host,
"port": args.port,
"ws_ping_interval": None,
"ws_ping_timeout": None,
"log_level": "warning",
},
daemon=True