Merge pull request #31 from 1Aa1k/feat/server-backpressure

Server-side frame dropping under client backpressure (fixes #30)
This commit is contained in:
SteadyW 2026-06-23 22:19:30 +03:00 committed by GitHub
commit ab39f1a5f4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 388 additions and 0 deletions

21
app.js
View file

@ -40,6 +40,7 @@ function formatTime(seconds) {
// ── STATE ──
let state = 'IDLE'; // IDLE | PLAYING | PAUSED
let ws = null;
let bufferReportTimer = null; // periodic backlog report to the server (backpressure)
const frameBuffer = [];
const BUFFER_SIZE = 4;
let codecDecoder = null; // Adaptive codec decoder (codec.js)
@ -237,6 +238,7 @@ function connectWebSocket() {
lastRenderTime = performance.now();
lastFpsUpdate = lastRenderTime;
requestAnimationFrame(renderFrame);
startBufferReports();
};
if (audioEl) {
@ -421,8 +423,27 @@ function renderFrame(now) {
// CLEANUP
// ═══════════════════════════════════════
// ── BACKPRESSURE REPORTING ──
// Tell the server how many decoded frames are queued for render (frameBuffer
// depth). When it grows the client is behind, and the server drops frames
// server-side instead of making us decode (inflate + delta-patch) frames we
// would only drop after. ~4 Hz is plenty: the server only needs a coarse signal.
function startBufferReports() {
stopBufferReports();
bufferReportTimer = setInterval(() => {
if (ws && ws.readyState === WebSocket.OPEN && state === 'PLAYING') {
ws.send(JSON.stringify({ type: 'buffer', depth: frameBuffer.length }));
}
}, 250);
}
function stopBufferReports() {
if (bufferReportTimer) { clearInterval(bufferReportTimer); bufferReportTimer = null; }
}
function finishStream() {
state = 'IDLE';
stopBufferReports();
if (ws) { ws.onclose = null; ws.close(); ws = null; }
if (audioEl) { audioEl.pause(); audioEl.src = ''; }
ctx.clearRect(0, 0, canvas.width, canvas.height);

View file

@ -597,6 +597,31 @@ async def websocket_endpoint(websocket: WebSocket):
buf = bytes(ascii_send_buf)
return ('bytes', buf, pf, raw_sz, len(buf))
# ── BACKPRESSURE FRAME-DROP ──
# Cheaply advance the source by one effective frame WITHOUT decoding,
# processing, encoding, or sending it. Used when the client reports a
# growing backlog: we skip the frame instead of making the client pay
# the inflate+delta-patch cost for a frame it would only drop after.
# prev_frame is intentionally left untouched by the caller, so the next
# SENT frame is a correct delta across the gap (deltas are always
# relative to the last sent frame). Returns False at EOF.
def advance_one():
for _ in range(skip_n):
if not decoder.grab():
return False
return True
# Drop once the client's decoded-frame backlog exceeds this. The client
# render loop keeps a ~BUFFER_SIZE (4) jitter buffer, so 8 is one extra
# buffer of slack before we start shedding. MAX_CONSEC_DROPS guarantees
# liveness: we always send a real frame at least this often, so a stalled
# or non-reporting client can never be starved and a large delta gap is
# bounded.
BACKLOG_HIGH = 8
MAX_CONSEC_DROPS = max(1, int(round(effective_fps))) # ~1s of frames
client_backlog = 0 # latest depth reported by the client (0 = unknown/healthy)
consec_drops = 0
_loop = asyncio.get_event_loop()
try:
@ -615,11 +640,39 @@ async def websocket_endpoint(websocket: WebSocket):
frame_index = int(target_sec * effective_fps)
start_time = _loop.time() - (frame_index * frame_t)
bw_start_time = time.time()
client_backlog = 0 # stale across a seek
consec_drops = 0
elif msg.get("type") == "buffer":
# Client's current decoded-frame backlog (frameBuffer.length).
try:
client_backlog = max(0, int(msg.get("depth", 0)))
except (TypeError, ValueError):
client_backlog = 0
if is_paused:
await asyncio.sleep(0.1)
continue
# ── BACKPRESSURE ──
# If the client is behind, skip this frame instead of sending one
# it will only decode-then-drop. Advancing the source keeps video
# time-aligned with the audio/wall clock; prev_frame is held so the
# next sent frame is a correct delta across the gap. MAX_CONSEC_DROPS
# caps the gap and guarantees we never starve the client.
if client_backlog > BACKLOG_HIGH and consec_drops < MAX_CONSEC_DROPS:
advanced = await _loop.run_in_executor(None, advance_one)
if not advanced:
break
client_backlog -= 1 # optimistic; corrected by next report
consec_drops += 1
frame_index += 1
elapsed = _loop.time() - start_time
wait = (frame_index * frame_t) - elapsed
if wait > 0:
await asyncio.sleep(wait)
continue
consec_drops = 0
# ALL CPU work in thread pool — event loop stays 100% free
result = await _loop.run_in_executor(
None, produce, prev_frame, frame_index)

81
test/_gap_fixture.py Normal file
View file

@ -0,0 +1,81 @@
"""
Fixture generator for the backpressure frame-drop test (see test_backpressure_gap.js).
Emits JSON on stdout describing two encodings of the SAME synthetic frame
sequence, both ending on frame 4:
drop : keyframe(0), then frames 1-3 are DROPPED server-side (prev_frame held),
then frame 4 encoded as a delta against frame 0's shown state.
full : every frame 0..4 encoded against the previous shown frame.
The decode side (codec.js, the shipped path) must reconstruct frame 4 bit-exact
in BOTH cases -- that is the correctness claim behind server-side dropping:
holding prev_frame across the gap keeps the delta chain exact. The drop case
must also send strictly fewer messages, which is the whole point.
Run standalone: python3 test/_gap_fixture.py
"""
import base64
import json
import os
import sys
import numpy as np
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from codec import encode_frame, TAG_DELTA # noqa: E402
ROWS, COLS, C = 8, 8, 4 # ASCII colour: [char, R, G, B]
def make_sequence():
"""Deterministic frames where only a few cells change each step, so the
post-gap frame still differs from frame 0 by a small fraction (-> DELTA path)."""
rng = np.random.default_rng(1234)
base = rng.integers(0, 256, size=(ROWS, COLS, C), dtype=np.uint8)
frames = [base.copy()]
f = base.copy()
for step in range(1, 5):
f = f.copy()
# Mutate one distinct cell per step (structure + colour), small delta.
r, col = step % ROWS, (step * 3) % COLS
f[r, col, 0] = (int(f[r, col, 0]) + 7 * step) % 256 # char plane
f[r, col, 1:] = (f[r, col, 1:].astype(int) + 30 * step) % 256
frames.append(f)
return frames
def b64(buf: bytes) -> str:
return base64.b64encode(buf).decode("ascii")
def main():
frames = make_sequence()
expected = frames[4]
# ── DROP path: keyframe 0, drop 1-3 (hold prev), delta 4 vs frame 0 ──
msg0, shown0 = encode_frame(frames[0], None, 0, tolerance=0)
msg4_drop, _ = encode_frame(frames[4], shown0, 4, tolerance=0)
drop_tag = msg4_drop[4] # byte after the 4-byte frame index
# ── FULL path: every frame against the previous shown ──
full_msgs = []
prev = None
for i, fr in enumerate(frames):
m, prev = encode_frame(fr, prev, i, tolerance=0)
full_msgs.append(b64(m))
out = {
"cellBytes": C,
"rows": ROWS,
"cols": COLS,
"expected": b64(expected.tobytes()),
"drop": {"messages": [b64(msg0), b64(msg4_drop)], "gapTag": drop_tag},
"full": {"messages": full_msgs},
"delta_tag": TAG_DELTA,
}
json.dump(out, sys.stdout)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,83 @@
/**
* Backpressure frame-drop correctness test (issue #30).
*
* Proves the claim behind server-side frame dropping: when the server drops
* frames for a slow client, it holds prev_frame across the gap, so the next
* SENT frame is a delta against the last sent frame. Decoding through the
* SHIPPED codec.js must reconstruct that post-gap frame bit-exact, identical to
* the no-drop path -- the client just decodes fewer frames to get there.
*
* Encoding is done by codec.py via _gap_fixture.py (the encoder only exists in
* Python); decoding uses codec.js, so this exercises the real Python<->JS path.
*
* Usage: node test/test_backpressure_gap.js
*/
const { execFileSync } = require('child_process');
const path = require('path');
const codec = require('../codec.js');
function b64ToU8(s) {
return new Uint8Array(Buffer.from(s, 'base64'));
}
async function decodeLast(messagesB64, cellBytes) {
const decoder = codec.makeDecoder(cellBytes);
let last = null;
for (const m of messagesB64) {
// decode() expects an ArrayBuffer (as WebSocket delivers); give it a fresh one.
const u8 = b64ToU8(m);
const ab = u8.buffer.slice(u8.byteOffset, u8.byteOffset + u8.byteLength);
last = (await decoder.decode(ab)).frame;
}
return last;
}
function eq(a, b) {
if (a.length !== b.length) return { ok: false, why: `len ${a.length} != ${b.length}` };
for (let i = 0; i < a.length; i++) {
if (a[i] !== b[i]) return { ok: false, why: `byte ${i}: ${a[i]} != ${b[i]}` };
}
return { ok: true };
}
(async () => {
const fixturePath = path.join(__dirname, '_gap_fixture.py');
const raw = execFileSync('python3', [fixturePath], { encoding: 'utf8' });
const fx = JSON.parse(raw);
const expected = b64ToU8(fx.expected);
const checks = [];
// 1. Drop path reconstructs frame 4 bit-exact (prev_frame held across the gap).
const dropFrame = await decodeLast(fx.drop.messages, fx.cellBytes);
const dropEq = eq(dropFrame, expected);
checks.push(['drop path decodes frame 4 bit-exact', dropEq.ok, dropEq.why]);
// 2. The post-gap frame is a real DELTA, not a fallback keyframe -- otherwise
// we wouldn't be exercising the held-prev_frame delta path at all.
checks.push([
'post-gap frame is a DELTA (tag 2)',
fx.drop.gapTag === fx.delta_tag,
`gapTag=${fx.drop.gapTag} expected=${fx.delta_tag}`,
]);
// 3. No-drop path also reconstructs frame 4 bit-exact (sanity / same endpoint).
const fullFrame = await decodeLast(fx.full.messages, fx.cellBytes);
const fullEq = eq(fullFrame, expected);
checks.push(['full path decodes frame 4 bit-exact', fullEq.ok, fullEq.why]);
// 4. The win: drop path makes the client decode strictly fewer frames.
checks.push([
'drop path sends fewer frames than full path',
fx.drop.messages.length < fx.full.messages.length,
`drop=${fx.drop.messages.length} full=${fx.full.messages.length}`,
]);
let failed = 0;
for (const [name, ok, why] of checks) {
console.log(`${ok ? 'PASS' : 'FAIL'} ${name}${ok ? '' : ' -> ' + why}`);
if (!ok) failed++;
}
console.log(`\n${checks.length - failed}/${checks.length} passed`);
process.exit(failed === 0 ? 0 : 1);
})().catch((e) => { console.error('ERROR', e); process.exit(2); });

View file

@ -0,0 +1,150 @@
/**
* Live behavioral test for server-side frame dropping (issue #30).
*
* Unlike test_backpressure_gap.js (which proves the codec stays bit-exact across
* a gap), this one runs the REAL server loop end-to-end and proves the drop
* mechanism actually fires: a client that reports a high decode backlog receives
* a stream with SKIPPED frame indices, while a client reporting zero backlog
* receives every frame in order.
*
* It generates a short clip with ffmpeg, launches stream_server.py, then opens
* two WebSocket clients over the same wall-clock window and compares what each
* received. Frame indices are read straight from the 4-byte big-endian header,
* so no decode is needed.
*
* Requires: ffmpeg, and a Python with the server deps (fastapi/uvicorn/opencv).
* Override the interpreter with ASCIL_PY (e.g. ASCIL_PY=/data/ascil-venv/bin/python).
*
* Usage: node test/test_backpressure_live.js
*/
const { spawn, execFileSync } = require('child_process');
const fs = require('fs');
const os = require('os');
const net = require('net');
const path = require('path');
const PY = process.env.ASCIL_PY || 'python3';
const REPO = path.dirname(__dirname);
const WINDOW_MS = 2500; // collection window per client
const HIGH_BACKLOG = 50; // well above the server's BACKLOG_HIGH (8)
function freePort() {
return new Promise((resolve, reject) => {
const srv = net.createServer();
srv.listen(0, '127.0.0.1', () => {
const port = srv.address().port;
srv.close(() => resolve(port));
});
srv.on('error', reject);
});
}
function waitForPort(port, timeoutMs) {
const deadline = Date.now() + timeoutMs;
return new Promise((resolve, reject) => {
const tryOnce = () => {
const sock = net.connect(port, '127.0.0.1');
sock.on('connect', () => { sock.destroy(); resolve(); });
sock.on('error', () => {
sock.destroy();
if (Date.now() > deadline) reject(new Error('server did not start'));
else setTimeout(tryOnce, 150);
});
};
tryOnce();
});
}
// Collect frame indices for WINDOW_MS. If reportDepth is set, spam buffer reports.
function collect(port, reportDepth) {
return new Promise((resolve, reject) => {
const ws = new WebSocket(`ws://127.0.0.1:${port}/ws?codec=adaptive`);
ws.binaryType = 'arraybuffer';
const indices = [];
let reporter = null, timer = null;
const stop = () => {
if (reporter) clearInterval(reporter);
if (timer) clearTimeout(timer);
try { ws.close(); } catch (_) {}
resolve(indices);
};
ws.onopen = () => {
timer = setTimeout(stop, WINDOW_MS);
if (reportDepth != null) {
const send = () => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type: 'buffer', depth: reportDepth }));
}
};
send();
reporter = setInterval(send, 100);
}
};
ws.onmessage = (ev) => {
if (typeof ev.data === 'string') return; // INIT / status
indices.push(new DataView(ev.data).getUint32(0, false));
};
ws.onerror = (e) => { if (timer) clearTimeout(timer); reject(e.error || new Error('ws error')); };
});
}
function maxGap(indices) {
let m = 0;
for (let i = 1; i < indices.length; i++) m = Math.max(m, indices[i] - indices[i - 1]);
return m;
}
(async () => {
const tmp = fs.mkdtempSync(path.join(os.tmpdir(), 'ascil-bp-'));
const clip = path.join(tmp, 'clip.mp4');
let server = null;
try {
// 6s of moving content at 24fps so consecutive frames differ (real deltas).
execFileSync('ffmpeg', [
'-y', '-f', 'lavfi', '-i', 'testsrc=size=160x120:rate=24:duration=6',
'-pix_fmt', 'yuv420p', clip,
], { stdio: 'ignore' });
const port = await freePort();
// stdin must stay OPEN: the server runs an interactive command loop on the
// main thread (uvicorn is a daemon thread), and EOF on stdin kills it.
server = spawn(PY, ['stream_server.py', clip, '--mode', '2', '--vol', '0',
'--cols', '80', '--no-thumbnails', '--host', '127.0.0.1', '--port', String(port)],
{ cwd: REPO, stdio: ['pipe', 'ignore', 'ignore'] });
server.on('error', (e) => { throw e; });
await waitForPort(port, 15000);
// Control first (every frame), then backpressure (high backlog). Each ws
// connection replays the clip from index 0.
const control = await collect(port, 0);
const slow = await collect(port, HIGH_BACKLOG);
const checks = [
['control client received frames', control.length > 5, `got ${control.length}`],
['control stream is contiguous (no server drops)', maxGap(control) <= 1,
`maxGap=${maxGap(control)}`],
['backpressure client received some frames (not starved)', slow.length > 0,
`got ${slow.length}`],
['backpressure stream has skipped indices (drops fired)', maxGap(slow) > 1,
`maxGap=${maxGap(slow)}`],
['backpressure received fewer frames than control', slow.length < control.length,
`slow=${slow.length} control=${control.length}`],
];
let failed = 0;
for (const [name, ok, why] of checks) {
console.log(`${ok ? 'PASS' : 'FAIL'} ${name}${ok ? '' : ' -> ' + why}`);
if (!ok) failed++;
}
console.log(`\ncontrol: ${control.length} frames (maxGap ${maxGap(control)}) | ` +
`backpressure: ${slow.length} frames (maxGap ${maxGap(slow)})`);
console.log(`${checks.length - failed}/${checks.length} passed`);
process.exitCode = failed === 0 ? 0 : 1;
} finally {
if (server) server.kill('SIGKILL');
fs.rmSync(tmp, { recursive: true, force: true });
}
})().catch((e) => { console.error('ERROR', e); process.exit(2); });