Implement pre-rendering buffer and background encoder thread

Overhauled the backend streaming loop in `stream_server.py` to use a threading queue for asynchronous frame encoding. This decouples heavy OpenCV work from the streaming loop, enabling smoother, high-framerate playback.
Updated `app.js` to buffer ~5 seconds of video stream natively before rendering, mitigating any frame drops from latency or encoder spikes.
This commit is contained in:
taisrisk 2026-06-15 16:34:55 +00:00
parent 318e6daab4
commit 025b17e9e4
2 changed files with 152 additions and 86 deletions

71
app.js
View file

@ -19,7 +19,8 @@ const volumeSlider = document.getElementById('volume-slider');
let state = 'IDLE'; // IDLE | PLAYING | PAUSED let state = 'IDLE'; // IDLE | PLAYING | PAUSED
let ws = null; let ws = null;
const frameBuffer = []; const frameBuffer = [];
const BUFFER_SIZE = 2; // Reduced buffer size for lower latency const TARGET_PRE_RENDER_SECONDS = 5; // 5 seconds of pre-rendering
let PRE_RENDER_BUFFER_SIZE = 150; // calculated as targetFps * 5
let codecDecoder = null; // Adaptive codec decoder (codec.js) let codecDecoder = null; // Adaptive codec decoder (codec.js)
let targetFps = 24; let targetFps = 24;
let frameInterval = 1000 / targetFps; let frameInterval = 1000 / targetFps;
@ -185,42 +186,74 @@ function connectWebSocket() {
codecDecoder = null; codecDecoder = null;
} }
// ── AUDIO READY GATE ── PRE_RENDER_BUFFER_SIZE = Math.ceil(targetFps * TARGET_PRE_RENDER_SECONDS);
// Buffer video frames but don't render until audio is ready.
// This prevents the 0.5s initial stutter. // ── PRE-RENDER / AUDIO READY GATE ──
// Buffer video frames for TARGET_PRE_RENDER_SECONDS before rendering
// to guarantee smooth streaming.
readyToRender = false; readyToRender = false;
state = 'PLAYING'; state = 'BUFFERING';
statusEl.textContent = `Pre-rendering... 0%`;
const beginRendering = () => { const beginRendering = () => {
state = 'PLAYING';
readyToRender = true; readyToRender = true;
streamStartTime = performance.now(); streamStartTime = performance.now() - (frameBuffer[0]?.time * 1000 || 0); // sync start to first buffered frame
lastRenderTime = performance.now(); lastRenderTime = performance.now();
lastFpsUpdate = lastRenderTime; lastFpsUpdate = lastRenderTime;
requestAnimationFrame(renderFrame); requestAnimationFrame(renderFrame);
}; };
let audioReady = false;
if (audioEl) { if (audioEl) {
audioEl.pause(); audioEl.pause();
const qs = currentQueueIndex !== null ? `?v=${currentQueueIndex}&` : '?'; const qs = currentQueueIndex !== null ? `?v=${currentQueueIndex}&` : '?';
audioEl.src = `/audio${qs}t=${Date.now()}`; audioEl.src = `/audio${qs}t=${Date.now()}`;
audioEl.volume = volumeSlider ? volumeSlider.value : 1.0; audioEl.volume = volumeSlider ? volumeSlider.value : 1.0;
audioEl.load(); audioEl.load();
audioEl.play().catch(() => {});
// Wait for audio to actually start playing const tryStart = () => {
if (audioReady && frameBuffer.length >= PRE_RENDER_BUFFER_SIZE && state === 'BUFFERING') {
audioEl.play().catch(() => {});
beginRendering();
}
};
if (audioEl.readyState >= 3) { if (audioEl.readyState >= 3) {
beginRendering(); audioReady = true;
tryStart();
} else { } else {
audioEl.addEventListener('playing', beginRendering, { once: true }); audioEl.addEventListener('canplay', () => {
// Fallback: if audio fails to load (vol=0 / 204), start after 500ms audioReady = true;
tryStart();
}, { once: true });
// Fallback: if audio fails to load (vol=0 / 204), mark as ready after 500ms
setTimeout(() => { setTimeout(() => {
if (!readyToRender) beginRendering(); audioReady = true;
tryStart();
}, 500); }, 500);
} }
} else { } else {
// No audio element at all → start immediately audioReady = true;
beginRendering();
} }
// Interval to check if pre-rendering is complete
const bufferCheckInterval = setInterval(() => {
if (state !== 'BUFFERING') {
clearInterval(bufferCheckInterval);
return;
}
const percent = Math.min(100, Math.round((frameBuffer.length / PRE_RENDER_BUFFER_SIZE) * 100));
statusEl.textContent = `Pre-rendering... ${percent}%`;
if (audioReady && frameBuffer.length >= PRE_RENDER_BUFFER_SIZE) {
clearInterval(bufferCheckInterval);
beginRendering();
}
}, 50);
return; return;
} }
@ -249,10 +282,11 @@ function connectWebSocket() {
} }
} }
while (frameBuffer.length > BUFFER_SIZE * 3) frameBuffer.shift(); // Cap max buffer to 2x our pre-render requirement to prevent memory leaks
while (frameBuffer.length > PRE_RENDER_BUFFER_SIZE * 2) frameBuffer.shift();
}; };
ws.onopen = () => { statusEl.textContent = 'Buffering...'; }; ws.onopen = () => { statusEl.textContent = 'Connecting...'; };
ws.onclose = () => { ws.onclose = () => {
if (state === 'PLAYING' || state === 'PAUSED') { if (state === 'PLAYING' || state === 'PAUSED') {
@ -289,13 +323,12 @@ function renderFrame(now) {
if (frameBuffer.length === 0) return; if (frameBuffer.length === 0) return;
// A/V Sync: Drop frames that are too far behind the master clock (catch up) // A/V Sync: Drop frames that are too far behind the master clock (catch up)
// Made more aggressive for lower latency live playback while (frameBuffer.length > 1 && frameBuffer[0].time < masterClock - 0.1) {
while (frameBuffer.length > 1 && frameBuffer[0].time < masterClock - 0.05) {
frameBuffer.shift(); frameBuffer.shift();
} }
// A/V Sync: Wait if the frame is in the future // A/V Sync: Wait if the frame is in the future
if (frameBuffer[0].time > masterClock + 0.02) { if (frameBuffer[0].time > masterClock + 0.05) {
return; return;
} }

View file

@ -15,6 +15,8 @@ import subprocess
import json import json
import numpy as np import numpy as np
import cv2 import cv2
import queue
import threading
from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse, StreamingResponse, FileResponse from fastapi.responses import HTMLResponse, StreamingResponse, FileResponse
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
@ -386,81 +388,111 @@ async def websocket_endpoint(websocket: WebSocket):
bw_raw_bytes = 0 bw_raw_bytes = 0
debug_mode = getattr(app.state, "debug", False) debug_mode = getattr(app.state, "debug", False)
frame_index = 0 frame_index = 0
prev_frame = None # previous framebuffer snapshot for delta coding
# Pre-allocate send buffer WITH header space to avoid per-frame concat # ── ASYNC PRE-ENCODING BUFFER THREAD ──
if pixel_mode: # We encode frames ahead of time into a thread-safe queue.
# Zero-Copy Pixel: 4-byte header + raw BGR (3 bytes per pixel) # This allows the Python event loop to stream asynchronously without being blocked by OpenCV.
pixel_send_buf = bytearray(4 + rows * cols * 3) # We buffer up to 150 frames.
elif render_mode > 1: frame_queue = queue.Queue(maxsize=150)
# ASCII Color: 4-byte header + [char,R,G,B] per pixel encode_thread_active = True
ascii_send_buf = bytearray(4 + rows * cols * 4)
raw_frame_num = 0 def encode_loop():
try: nonlocal decoder
while True: prev_frame = None # previous framebuffer snapshot for delta coding
# ── FPS DECIMATION via grab() ──
# For 60→30 fps: grab (skip) 1 frame, then decode 1 frame.
# grab() is ~10x faster than read() because it skips decoding.
for _ in range(skip_n - 1):
if not decoder.grab():
break # EOF reached during skip
try: # Pre-allocate send buffer WITH header space to avoid per-frame concat
gray_frame, bgr_frame = next(decoder) if pixel_mode:
except StopIteration: # Zero-Copy Pixel: 4-byte header + raw BGR (3 bytes per pixel)
break pixel_send_buf = bytearray(4 + rows * cols * 3)
elif render_mode > 1:
# ASCII Color: 4-byte header + [char,R,G,B] per pixel
ascii_send_buf = bytearray(4 + rows * cols * 4)
if pixel_mode: enc_frame_index = 0
# ── PIXEL MODE: raw BGR (3 bytes/cell) ── try:
raw_size = 4 + rows * cols * 3 while encode_thread_active:
if adaptive: # ── FPS DECIMATION via grab() ──
msg, prev_frame = encode_frame( # For 60→30 fps: grab (skip) 1 frame, then decode 1 frame.
np.ascontiguousarray(bgr_frame), # grab() is ~10x faster than read() because it skips decoding.
prev_frame, frame_index, tolerance=tolerance) for _ in range(skip_n - 1):
await websocket.send_bytes(msg) if not decoder.grab():
bw_bytes_sent += len(msg) break # EOF reached during skip
bw_raw_bytes += raw_size
else:
# ── ZERO-COPY PIXEL MODE (legacy) ──
struct.pack_into(">I", pixel_send_buf, 0, frame_index)
pixel_send_buf[4:] = bgr_frame.tobytes()
await websocket.send_bytes(bytes(pixel_send_buf))
bw_bytes_sent += len(pixel_send_buf)
bw_raw_bytes += len(pixel_send_buf)
else:
indices = np.floor_divide(gray_frame, max(1, 256 // mapper._n))
np.clip(indices, 0, mapper._n - 1, out=indices)
if render_mode == 1: try:
char_matrix = mapper._lut[indices] gray_frame, bgr_frame = next(decoder)
lines = [''.join(row) for row in char_matrix] except StopIteration:
payload = f"{frame_index}\n" + '\n'.join(lines) frame_queue.put((None, None, None)) # EOF marker
await websocket.send_text(payload) break
payload_size = len(payload.encode('utf-8'))
bw_bytes_sent += payload_size if pixel_mode:
bw_raw_bytes += payload_size # ── PIXEL MODE: raw BGR (3 bytes/cell) ──
else: raw_size = 4 + rows * cols * 3
char_codes = char_byte_lut[indices]
rgb = bgr_frame[:, :, ::-1]
if qb > 0:
rgb = (rgb >> qb) << qb
frame_buf[:, :, 0] = char_codes
frame_buf[:, :, 1:] = rgb
raw_size = 4 + rows * cols * 4
if adaptive: if adaptive:
msg, prev_frame = encode_frame( msg, prev_frame = encode_frame(
frame_buf, prev_frame, frame_index, np.ascontiguousarray(bgr_frame),
tolerance=tolerance) prev_frame, enc_frame_index, tolerance=tolerance)
await websocket.send_bytes(msg) frame_queue.put((msg, raw_size, True)) # (data, raw_size, is_binary)
bw_bytes_sent += len(msg)
bw_raw_bytes += raw_size
else: else:
struct.pack_into(">I", ascii_send_buf, 0, frame_index) # ── ZERO-COPY PIXEL MODE (legacy) ──
ascii_send_buf[4:] = frame_buf.tobytes() struct.pack_into(">I", pixel_send_buf, 0, enc_frame_index)
await websocket.send_bytes(bytes(ascii_send_buf)) pixel_send_buf[4:] = bgr_frame.tobytes()
bw_bytes_sent += len(ascii_send_buf) frame_queue.put((bytes(pixel_send_buf), raw_size, True))
bw_raw_bytes += len(ascii_send_buf) else:
indices = np.floor_divide(gray_frame, max(1, 256 // mapper._n))
np.clip(indices, 0, mapper._n - 1, out=indices)
if render_mode == 1:
char_matrix = mapper._lut[indices]
lines = [''.join(row) for row in char_matrix]
payload = f"{enc_frame_index}\n" + '\n'.join(lines)
payload_size = len(payload.encode('utf-8'))
frame_queue.put((payload, payload_size, False))
else:
char_codes = char_byte_lut[indices]
rgb = bgr_frame[:, :, ::-1]
if qb > 0:
rgb = (rgb >> qb) << qb
frame_buf[:, :, 0] = char_codes
frame_buf[:, :, 1:] = rgb
raw_size = 4 + rows * cols * 4
if adaptive:
msg, prev_frame = encode_frame(
frame_buf, prev_frame, enc_frame_index,
tolerance=tolerance)
frame_queue.put((msg, raw_size, True))
else:
struct.pack_into(">I", ascii_send_buf, 0, enc_frame_index)
ascii_send_buf[4:] = frame_buf.tobytes()
frame_queue.put((bytes(ascii_send_buf), raw_size, True))
enc_frame_index += 1
except Exception as e:
print(f"Encode thread error: {e}")
frame_queue.put((None, None, None))
encoder_thread = threading.Thread(target=encode_loop, daemon=True)
encoder_thread.start()
try:
while True:
# Non-blocking get to yield back to event loop if queue is empty
try:
data, raw_size, is_binary = frame_queue.get_nowait()
except queue.Empty:
await asyncio.sleep(0.001)
continue
if data is None:
break # EOF marker
if is_binary:
await websocket.send_bytes(data)
bw_bytes_sent += len(data)
bw_raw_bytes += raw_size
else:
await websocket.send_text(data)
bw_bytes_sent += len(data.encode('utf-8'))
bw_raw_bytes += raw_size
current_time = time.time() current_time = time.time()
if debug_mode and current_time - bw_start_time >= 1.0: if debug_mode and current_time - bw_start_time >= 1.0:
@ -483,6 +515,7 @@ async def websocket_endpoint(websocket: WebSocket):
frame_index += 1 frame_index += 1
finally: finally:
encode_thread_active = False
decoder.release() decoder.release()
# Video finished → advance queue # Video finished → advance queue