diff --git a/app.js b/app.js index dde6fd1..a0f97c2 100644 --- a/app.js +++ b/app.js @@ -40,6 +40,7 @@ function formatTime(seconds) { // ── STATE ── let state = 'IDLE'; // IDLE | PLAYING | PAUSED let ws = null; +let bufferReportTimer = null; // periodic backlog report to the server (backpressure) const frameBuffer = []; const BUFFER_SIZE = 4; let codecDecoder = null; // Adaptive codec decoder (codec.js) @@ -237,6 +238,7 @@ function connectWebSocket() { lastRenderTime = performance.now(); lastFpsUpdate = lastRenderTime; requestAnimationFrame(renderFrame); + startBufferReports(); }; if (audioEl) { @@ -421,8 +423,27 @@ function renderFrame(now) { // CLEANUP // ═══════════════════════════════════════ +// ── BACKPRESSURE REPORTING ── +// Tell the server how many decoded frames are queued for render (frameBuffer +// depth). When it grows the client is behind, and the server drops frames +// server-side instead of making us decode (inflate + delta-patch) frames we +// would only drop after. ~4 Hz is plenty: the server only needs a coarse signal. +function startBufferReports() { + stopBufferReports(); + bufferReportTimer = setInterval(() => { + if (ws && ws.readyState === WebSocket.OPEN && state === 'PLAYING') { + ws.send(JSON.stringify({ type: 'buffer', depth: frameBuffer.length })); + } + }, 250); +} + +function stopBufferReports() { + if (bufferReportTimer) { clearInterval(bufferReportTimer); bufferReportTimer = null; } +} + function finishStream() { state = 'IDLE'; + stopBufferReports(); if (ws) { ws.onclose = null; ws.close(); ws = null; } if (audioEl) { audioEl.pause(); audioEl.src = ''; } ctx.clearRect(0, 0, canvas.width, canvas.height); diff --git a/stream_server.py b/stream_server.py index 8c3d448..b594af2 100644 --- a/stream_server.py +++ b/stream_server.py @@ -597,6 +597,31 @@ async def websocket_endpoint(websocket: WebSocket): buf = bytes(ascii_send_buf) return ('bytes', buf, pf, raw_sz, len(buf)) + # ── BACKPRESSURE FRAME-DROP ── + # Cheaply advance the source by one effective frame WITHOUT decoding, + # processing, encoding, or sending it. Used when the client reports a + # growing backlog: we skip the frame instead of making the client pay + # the inflate+delta-patch cost for a frame it would only drop after. + # prev_frame is intentionally left untouched by the caller, so the next + # SENT frame is a correct delta across the gap (deltas are always + # relative to the last sent frame). Returns False at EOF. + def advance_one(): + for _ in range(skip_n): + if not decoder.grab(): + return False + return True + + # Drop once the client's decoded-frame backlog exceeds this. The client + # render loop keeps a ~BUFFER_SIZE (4) jitter buffer, so 8 is one extra + # buffer of slack before we start shedding. MAX_CONSEC_DROPS guarantees + # liveness: we always send a real frame at least this often, so a stalled + # or non-reporting client can never be starved and a large delta gap is + # bounded. + BACKLOG_HIGH = 8 + MAX_CONSEC_DROPS = max(1, int(round(effective_fps))) # ~1s of frames + client_backlog = 0 # latest depth reported by the client (0 = unknown/healthy) + consec_drops = 0 + _loop = asyncio.get_event_loop() try: @@ -615,11 +640,39 @@ async def websocket_endpoint(websocket: WebSocket): frame_index = int(target_sec * effective_fps) start_time = _loop.time() - (frame_index * frame_t) bw_start_time = time.time() + client_backlog = 0 # stale across a seek + consec_drops = 0 + elif msg.get("type") == "buffer": + # Client's current decoded-frame backlog (frameBuffer.length). + try: + client_backlog = max(0, int(msg.get("depth", 0))) + except (TypeError, ValueError): + client_backlog = 0 if is_paused: await asyncio.sleep(0.1) continue + # ── BACKPRESSURE ── + # If the client is behind, skip this frame instead of sending one + # it will only decode-then-drop. Advancing the source keeps video + # time-aligned with the audio/wall clock; prev_frame is held so the + # next sent frame is a correct delta across the gap. MAX_CONSEC_DROPS + # caps the gap and guarantees we never starve the client. + if client_backlog > BACKLOG_HIGH and consec_drops < MAX_CONSEC_DROPS: + advanced = await _loop.run_in_executor(None, advance_one) + if not advanced: + break + client_backlog -= 1 # optimistic; corrected by next report + consec_drops += 1 + frame_index += 1 + elapsed = _loop.time() - start_time + wait = (frame_index * frame_t) - elapsed + if wait > 0: + await asyncio.sleep(wait) + continue + consec_drops = 0 + # ALL CPU work in thread pool — event loop stays 100% free result = await _loop.run_in_executor( None, produce, prev_frame, frame_index) diff --git a/test/_gap_fixture.py b/test/_gap_fixture.py new file mode 100644 index 0000000..92cc0d9 --- /dev/null +++ b/test/_gap_fixture.py @@ -0,0 +1,81 @@ +""" +Fixture generator for the backpressure frame-drop test (see test_backpressure_gap.js). + +Emits JSON on stdout describing two encodings of the SAME synthetic frame +sequence, both ending on frame 4: + + drop : keyframe(0), then frames 1-3 are DROPPED server-side (prev_frame held), + then frame 4 encoded as a delta against frame 0's shown state. + full : every frame 0..4 encoded against the previous shown frame. + +The decode side (codec.js, the shipped path) must reconstruct frame 4 bit-exact +in BOTH cases -- that is the correctness claim behind server-side dropping: +holding prev_frame across the gap keeps the delta chain exact. The drop case +must also send strictly fewer messages, which is the whole point. + +Run standalone: python3 test/_gap_fixture.py +""" +import base64 +import json +import os +import sys + +import numpy as np + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from codec import encode_frame, TAG_DELTA # noqa: E402 + +ROWS, COLS, C = 8, 8, 4 # ASCII colour: [char, R, G, B] + + +def make_sequence(): + """Deterministic frames where only a few cells change each step, so the + post-gap frame still differs from frame 0 by a small fraction (-> DELTA path).""" + rng = np.random.default_rng(1234) + base = rng.integers(0, 256, size=(ROWS, COLS, C), dtype=np.uint8) + frames = [base.copy()] + f = base.copy() + for step in range(1, 5): + f = f.copy() + # Mutate one distinct cell per step (structure + colour), small delta. + r, col = step % ROWS, (step * 3) % COLS + f[r, col, 0] = (int(f[r, col, 0]) + 7 * step) % 256 # char plane + f[r, col, 1:] = (f[r, col, 1:].astype(int) + 30 * step) % 256 + frames.append(f) + return frames + + +def b64(buf: bytes) -> str: + return base64.b64encode(buf).decode("ascii") + + +def main(): + frames = make_sequence() + expected = frames[4] + + # ── DROP path: keyframe 0, drop 1-3 (hold prev), delta 4 vs frame 0 ── + msg0, shown0 = encode_frame(frames[0], None, 0, tolerance=0) + msg4_drop, _ = encode_frame(frames[4], shown0, 4, tolerance=0) + drop_tag = msg4_drop[4] # byte after the 4-byte frame index + + # ── FULL path: every frame against the previous shown ── + full_msgs = [] + prev = None + for i, fr in enumerate(frames): + m, prev = encode_frame(fr, prev, i, tolerance=0) + full_msgs.append(b64(m)) + + out = { + "cellBytes": C, + "rows": ROWS, + "cols": COLS, + "expected": b64(expected.tobytes()), + "drop": {"messages": [b64(msg0), b64(msg4_drop)], "gapTag": drop_tag}, + "full": {"messages": full_msgs}, + "delta_tag": TAG_DELTA, + } + json.dump(out, sys.stdout) + + +if __name__ == "__main__": + main() diff --git a/test/test_backpressure_gap.js b/test/test_backpressure_gap.js new file mode 100644 index 0000000..2aad34c --- /dev/null +++ b/test/test_backpressure_gap.js @@ -0,0 +1,83 @@ +/** + * Backpressure frame-drop correctness test (issue #30). + * + * Proves the claim behind server-side frame dropping: when the server drops + * frames for a slow client, it holds prev_frame across the gap, so the next + * SENT frame is a delta against the last sent frame. Decoding through the + * SHIPPED codec.js must reconstruct that post-gap frame bit-exact, identical to + * the no-drop path -- the client just decodes fewer frames to get there. + * + * Encoding is done by codec.py via _gap_fixture.py (the encoder only exists in + * Python); decoding uses codec.js, so this exercises the real Python<->JS path. + * + * Usage: node test/test_backpressure_gap.js + */ +const { execFileSync } = require('child_process'); +const path = require('path'); +const codec = require('../codec.js'); + +function b64ToU8(s) { + return new Uint8Array(Buffer.from(s, 'base64')); +} + +async function decodeLast(messagesB64, cellBytes) { + const decoder = codec.makeDecoder(cellBytes); + let last = null; + for (const m of messagesB64) { + // decode() expects an ArrayBuffer (as WebSocket delivers); give it a fresh one. + const u8 = b64ToU8(m); + const ab = u8.buffer.slice(u8.byteOffset, u8.byteOffset + u8.byteLength); + last = (await decoder.decode(ab)).frame; + } + return last; +} + +function eq(a, b) { + if (a.length !== b.length) return { ok: false, why: `len ${a.length} != ${b.length}` }; + for (let i = 0; i < a.length; i++) { + if (a[i] !== b[i]) return { ok: false, why: `byte ${i}: ${a[i]} != ${b[i]}` }; + } + return { ok: true }; +} + +(async () => { + const fixturePath = path.join(__dirname, '_gap_fixture.py'); + const raw = execFileSync('python3', [fixturePath], { encoding: 'utf8' }); + const fx = JSON.parse(raw); + + const expected = b64ToU8(fx.expected); + const checks = []; + + // 1. Drop path reconstructs frame 4 bit-exact (prev_frame held across the gap). + const dropFrame = await decodeLast(fx.drop.messages, fx.cellBytes); + const dropEq = eq(dropFrame, expected); + checks.push(['drop path decodes frame 4 bit-exact', dropEq.ok, dropEq.why]); + + // 2. The post-gap frame is a real DELTA, not a fallback keyframe -- otherwise + // we wouldn't be exercising the held-prev_frame delta path at all. + checks.push([ + 'post-gap frame is a DELTA (tag 2)', + fx.drop.gapTag === fx.delta_tag, + `gapTag=${fx.drop.gapTag} expected=${fx.delta_tag}`, + ]); + + // 3. No-drop path also reconstructs frame 4 bit-exact (sanity / same endpoint). + const fullFrame = await decodeLast(fx.full.messages, fx.cellBytes); + const fullEq = eq(fullFrame, expected); + checks.push(['full path decodes frame 4 bit-exact', fullEq.ok, fullEq.why]); + + // 4. The win: drop path makes the client decode strictly fewer frames. + checks.push([ + 'drop path sends fewer frames than full path', + fx.drop.messages.length < fx.full.messages.length, + `drop=${fx.drop.messages.length} full=${fx.full.messages.length}`, + ]); + + let failed = 0; + for (const [name, ok, why] of checks) { + console.log(`${ok ? 'PASS' : 'FAIL'} ${name}${ok ? '' : ' -> ' + why}`); + if (!ok) failed++; + } + console.log(`\n${checks.length - failed}/${checks.length} passed`); + process.exit(failed === 0 ? 0 : 1); +})().catch((e) => { console.error('ERROR', e); process.exit(2); });