From 025b17e9e46380de447670cdc142cc9334d97167 Mon Sep 17 00:00:00 2001 From: taisrisk <145309547+taisrisk@users.noreply.github.com> Date: Mon, 15 Jun 2026 16:34:55 +0000 Subject: [PATCH] Implement pre-rendering buffer and background encoder thread Overhauled the backend streaming loop in `stream_server.py` to use a threading queue for asynchronous frame encoding. This decouples heavy OpenCV work from the streaming loop, enabling smoother, high-framerate playback. Updated `app.js` to buffer ~5 seconds of video stream natively before rendering, mitigating any frame drops from latency or encoder spikes. --- app.js | 71 ++++++++++++++------ stream_server.py | 167 ++++++++++++++++++++++++++++------------------- 2 files changed, 152 insertions(+), 86 deletions(-) diff --git a/app.js b/app.js index ca929ab..a582247 100644 --- a/app.js +++ b/app.js @@ -19,7 +19,8 @@ const volumeSlider = document.getElementById('volume-slider'); let state = 'IDLE'; // IDLE | PLAYING | PAUSED let ws = null; const frameBuffer = []; -const BUFFER_SIZE = 2; // Reduced buffer size for lower latency +const TARGET_PRE_RENDER_SECONDS = 5; // 5 seconds of pre-rendering +let PRE_RENDER_BUFFER_SIZE = 150; // calculated as targetFps * 5 let codecDecoder = null; // Adaptive codec decoder (codec.js) let targetFps = 24; let frameInterval = 1000 / targetFps; @@ -185,42 +186,74 @@ function connectWebSocket() { codecDecoder = null; } - // ── AUDIO READY GATE ── - // Buffer video frames but don't render until audio is ready. - // This prevents the 0.5s initial stutter. + PRE_RENDER_BUFFER_SIZE = Math.ceil(targetFps * TARGET_PRE_RENDER_SECONDS); + + // ── PRE-RENDER / AUDIO READY GATE ── + // Buffer video frames for TARGET_PRE_RENDER_SECONDS before rendering + // to guarantee smooth streaming. readyToRender = false; - state = 'PLAYING'; + state = 'BUFFERING'; + statusEl.textContent = `Pre-rendering... 0%`; const beginRendering = () => { + state = 'PLAYING'; readyToRender = true; - streamStartTime = performance.now(); + streamStartTime = performance.now() - (frameBuffer[0]?.time * 1000 || 0); // sync start to first buffered frame lastRenderTime = performance.now(); lastFpsUpdate = lastRenderTime; requestAnimationFrame(renderFrame); }; + let audioReady = false; + if (audioEl) { audioEl.pause(); const qs = currentQueueIndex !== null ? `?v=${currentQueueIndex}&` : '?'; audioEl.src = `/audio${qs}t=${Date.now()}`; audioEl.volume = volumeSlider ? volumeSlider.value : 1.0; audioEl.load(); - audioEl.play().catch(() => {}); - // Wait for audio to actually start playing + const tryStart = () => { + if (audioReady && frameBuffer.length >= PRE_RENDER_BUFFER_SIZE && state === 'BUFFERING') { + audioEl.play().catch(() => {}); + beginRendering(); + } + }; + if (audioEl.readyState >= 3) { - beginRendering(); + audioReady = true; + tryStart(); } else { - audioEl.addEventListener('playing', beginRendering, { once: true }); - // Fallback: if audio fails to load (vol=0 / 204), start after 500ms + audioEl.addEventListener('canplay', () => { + audioReady = true; + tryStart(); + }, { once: true }); + + // Fallback: if audio fails to load (vol=0 / 204), mark as ready after 500ms setTimeout(() => { - if (!readyToRender) beginRendering(); + audioReady = true; + tryStart(); }, 500); } } else { - // No audio element at all → start immediately - beginRendering(); + audioReady = true; } + + // Interval to check if pre-rendering is complete + const bufferCheckInterval = setInterval(() => { + if (state !== 'BUFFERING') { + clearInterval(bufferCheckInterval); + return; + } + const percent = Math.min(100, Math.round((frameBuffer.length / PRE_RENDER_BUFFER_SIZE) * 100)); + statusEl.textContent = `Pre-rendering... ${percent}%`; + + if (audioReady && frameBuffer.length >= PRE_RENDER_BUFFER_SIZE) { + clearInterval(bufferCheckInterval); + beginRendering(); + } + }, 50); + return; } @@ -249,10 +282,11 @@ function connectWebSocket() { } } - while (frameBuffer.length > BUFFER_SIZE * 3) frameBuffer.shift(); + // Cap max buffer to 2x our pre-render requirement to prevent memory leaks + while (frameBuffer.length > PRE_RENDER_BUFFER_SIZE * 2) frameBuffer.shift(); }; - ws.onopen = () => { statusEl.textContent = 'Buffering...'; }; + ws.onopen = () => { statusEl.textContent = 'Connecting...'; }; ws.onclose = () => { if (state === 'PLAYING' || state === 'PAUSED') { @@ -289,13 +323,12 @@ function renderFrame(now) { if (frameBuffer.length === 0) return; // A/V Sync: Drop frames that are too far behind the master clock (catch up) - // Made more aggressive for lower latency live playback - while (frameBuffer.length > 1 && frameBuffer[0].time < masterClock - 0.05) { + while (frameBuffer.length > 1 && frameBuffer[0].time < masterClock - 0.1) { frameBuffer.shift(); } // A/V Sync: Wait if the frame is in the future - if (frameBuffer[0].time > masterClock + 0.02) { + if (frameBuffer[0].time > masterClock + 0.05) { return; } diff --git a/stream_server.py b/stream_server.py index d9e160a..1a8a739 100644 --- a/stream_server.py +++ b/stream_server.py @@ -15,6 +15,8 @@ import subprocess import json import numpy as np import cv2 +import queue +import threading from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.responses import HTMLResponse, StreamingResponse, FileResponse from fastapi.staticfiles import StaticFiles @@ -386,81 +388,111 @@ async def websocket_endpoint(websocket: WebSocket): bw_raw_bytes = 0 debug_mode = getattr(app.state, "debug", False) frame_index = 0 - prev_frame = None # previous framebuffer snapshot for delta coding - # Pre-allocate send buffer WITH header space to avoid per-frame concat - if pixel_mode: - # Zero-Copy Pixel: 4-byte header + raw BGR (3 bytes per pixel) - pixel_send_buf = bytearray(4 + rows * cols * 3) - elif render_mode > 1: - # ASCII Color: 4-byte header + [char,R,G,B] per pixel - ascii_send_buf = bytearray(4 + rows * cols * 4) + # ── ASYNC PRE-ENCODING BUFFER THREAD ── + # We encode frames ahead of time into a thread-safe queue. + # This allows the Python event loop to stream asynchronously without being blocked by OpenCV. + # We buffer up to 150 frames. + frame_queue = queue.Queue(maxsize=150) + encode_thread_active = True - raw_frame_num = 0 - try: - while True: - # ── FPS DECIMATION via grab() ── - # For 60→30 fps: grab (skip) 1 frame, then decode 1 frame. - # grab() is ~10x faster than read() because it skips decoding. - for _ in range(skip_n - 1): - if not decoder.grab(): - break # EOF reached during skip + def encode_loop(): + nonlocal decoder + prev_frame = None # previous framebuffer snapshot for delta coding - try: - gray_frame, bgr_frame = next(decoder) - except StopIteration: - break + # Pre-allocate send buffer WITH header space to avoid per-frame concat + if pixel_mode: + # Zero-Copy Pixel: 4-byte header + raw BGR (3 bytes per pixel) + pixel_send_buf = bytearray(4 + rows * cols * 3) + elif render_mode > 1: + # ASCII Color: 4-byte header + [char,R,G,B] per pixel + ascii_send_buf = bytearray(4 + rows * cols * 4) - if pixel_mode: - # ── PIXEL MODE: raw BGR (3 bytes/cell) ── - raw_size = 4 + rows * cols * 3 - if adaptive: - msg, prev_frame = encode_frame( - np.ascontiguousarray(bgr_frame), - prev_frame, frame_index, tolerance=tolerance) - await websocket.send_bytes(msg) - bw_bytes_sent += len(msg) - bw_raw_bytes += raw_size - else: - # ── ZERO-COPY PIXEL MODE (legacy) ── - struct.pack_into(">I", pixel_send_buf, 0, frame_index) - pixel_send_buf[4:] = bgr_frame.tobytes() - await websocket.send_bytes(bytes(pixel_send_buf)) - bw_bytes_sent += len(pixel_send_buf) - bw_raw_bytes += len(pixel_send_buf) - else: - indices = np.floor_divide(gray_frame, max(1, 256 // mapper._n)) - np.clip(indices, 0, mapper._n - 1, out=indices) + enc_frame_index = 0 + try: + while encode_thread_active: + # ── FPS DECIMATION via grab() ── + # For 60→30 fps: grab (skip) 1 frame, then decode 1 frame. + # grab() is ~10x faster than read() because it skips decoding. + for _ in range(skip_n - 1): + if not decoder.grab(): + break # EOF reached during skip - if render_mode == 1: - char_matrix = mapper._lut[indices] - lines = [''.join(row) for row in char_matrix] - payload = f"{frame_index}\n" + '\n'.join(lines) - await websocket.send_text(payload) - payload_size = len(payload.encode('utf-8')) - bw_bytes_sent += payload_size - bw_raw_bytes += payload_size - else: - char_codes = char_byte_lut[indices] - rgb = bgr_frame[:, :, ::-1] - if qb > 0: - rgb = (rgb >> qb) << qb - frame_buf[:, :, 0] = char_codes - frame_buf[:, :, 1:] = rgb - raw_size = 4 + rows * cols * 4 + try: + gray_frame, bgr_frame = next(decoder) + except StopIteration: + frame_queue.put((None, None, None)) # EOF marker + break + + if pixel_mode: + # ── PIXEL MODE: raw BGR (3 bytes/cell) ── + raw_size = 4 + rows * cols * 3 if adaptive: msg, prev_frame = encode_frame( - frame_buf, prev_frame, frame_index, - tolerance=tolerance) - await websocket.send_bytes(msg) - bw_bytes_sent += len(msg) - bw_raw_bytes += raw_size + np.ascontiguousarray(bgr_frame), + prev_frame, enc_frame_index, tolerance=tolerance) + frame_queue.put((msg, raw_size, True)) # (data, raw_size, is_binary) else: - struct.pack_into(">I", ascii_send_buf, 0, frame_index) - ascii_send_buf[4:] = frame_buf.tobytes() - await websocket.send_bytes(bytes(ascii_send_buf)) - bw_bytes_sent += len(ascii_send_buf) - bw_raw_bytes += len(ascii_send_buf) + # ── ZERO-COPY PIXEL MODE (legacy) ── + struct.pack_into(">I", pixel_send_buf, 0, enc_frame_index) + pixel_send_buf[4:] = bgr_frame.tobytes() + frame_queue.put((bytes(pixel_send_buf), raw_size, True)) + else: + indices = np.floor_divide(gray_frame, max(1, 256 // mapper._n)) + np.clip(indices, 0, mapper._n - 1, out=indices) + + if render_mode == 1: + char_matrix = mapper._lut[indices] + lines = [''.join(row) for row in char_matrix] + payload = f"{enc_frame_index}\n" + '\n'.join(lines) + payload_size = len(payload.encode('utf-8')) + frame_queue.put((payload, payload_size, False)) + else: + char_codes = char_byte_lut[indices] + rgb = bgr_frame[:, :, ::-1] + if qb > 0: + rgb = (rgb >> qb) << qb + frame_buf[:, :, 0] = char_codes + frame_buf[:, :, 1:] = rgb + raw_size = 4 + rows * cols * 4 + if adaptive: + msg, prev_frame = encode_frame( + frame_buf, prev_frame, enc_frame_index, + tolerance=tolerance) + frame_queue.put((msg, raw_size, True)) + else: + struct.pack_into(">I", ascii_send_buf, 0, enc_frame_index) + ascii_send_buf[4:] = frame_buf.tobytes() + frame_queue.put((bytes(ascii_send_buf), raw_size, True)) + + enc_frame_index += 1 + except Exception as e: + print(f"Encode thread error: {e}") + frame_queue.put((None, None, None)) + + encoder_thread = threading.Thread(target=encode_loop, daemon=True) + encoder_thread.start() + + try: + while True: + # Non-blocking get to yield back to event loop if queue is empty + try: + data, raw_size, is_binary = frame_queue.get_nowait() + except queue.Empty: + await asyncio.sleep(0.001) + continue + + if data is None: + break # EOF marker + + if is_binary: + await websocket.send_bytes(data) + bw_bytes_sent += len(data) + bw_raw_bytes += raw_size + else: + await websocket.send_text(data) + bw_bytes_sent += len(data.encode('utf-8')) + bw_raw_bytes += raw_size current_time = time.time() if debug_mode and current_time - bw_start_time >= 1.0: @@ -483,6 +515,7 @@ async def websocket_endpoint(websocket: WebSocket): frame_index += 1 finally: + encode_thread_active = False decoder.release() # Video finished → advance queue