feat: polished player UI for live mode (hover thumbnails, skip, responsive)

Builds on the existing live seek/play/volume. Adds a polished, responsive control bar with play/pause, +/-10s skip, a played-progress fill, and a YouTube-style hover thumbnail preview on the seek bar. Thumbnails come from a small lazy /scrub endpoint that builds an in-memory sprite once per video with a single ffmpeg pass (no disk cache); easy to point at the static compiler's sprite instead.
This commit is contained in:
Shaku-Med 2026-06-18 11:39:39 -04:00
parent 461e0bd939
commit a253c17507
4 changed files with 360 additions and 69 deletions

View file

@ -244,6 +244,95 @@ async def audio_stream(v: int | None = None, start: float = 0.0):
)
# ── Scrub-preview sprite (powers the hover thumbnails on the seek bar) ──
# A grid of small frames sampled across the video, like a YouTube preview strip.
# Built once per video on first request and kept in memory only (no disk cache).
# If you'd rather serve a sprite from the static compiler, just point /scrub at it.
_scrub_cache: dict = {} # video_path -> {"meta": {...}, "jpeg": bytes} or None
def _build_scrub_sprite(video_path: str, max_count: int = 64, cell_w: int = 160):
import math
# Probe size + duration quickly (metadata only, no frame decoding).
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return None
fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
w0 = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h0 = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
cap.release()
duration = (total / fps) if fps else 0
if duration <= 0 or w0 <= 0 or h0 <= 0:
return None
cell_h = max(1, round(cell_w * h0 / w0))
n = max(1, min(max_count, int(duration))) # roughly one frame per second
cols = max(1, math.ceil(math.sqrt(n)))
rows = max(1, math.ceil(n / cols))
interval = duration / n
# One ffmpeg pass: sample frames, scale them, tile into a single grid image.
# Sequential decode (no per-frame seeking), so this is fast even on long clips.
vf = f"fps={n}/{duration:.3f},scale={cell_w}:{cell_h},tile={cols}x{rows}"
try:
proc = subprocess.run(
["ffmpeg", "-nostdin", "-i", video_path, "-vf", vf,
"-frames:v", "1", "-q:v", "4", "-f", "image2", "-c:v", "mjpeg",
"-loglevel", "error", "pipe:1"],
stdin=subprocess.DEVNULL, stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL, timeout=120,
)
except (FileNotFoundError, subprocess.TimeoutExpired):
return None
if proc.returncode != 0 or not proc.stdout:
return None
return {
"meta": {"available": True, "count": n, "gridCols": cols, "gridRows": rows,
"cellW": cell_w, "cellH": cell_h, "interval": interval, "duration": duration},
"jpeg": proc.stdout,
}
def _scrub_video_path(v: int | None) -> str:
queue = getattr(app.state, "queue", [])
idx = getattr(app.state, "current_index", 0)
if v is not None and 0 <= v < len(queue):
idx = v
entry = queue[idx] if queue and 0 <= idx < len(queue) else {}
return entry.get("video", "")
@app.get("/scrub")
async def scrub_meta(v: int | None = None):
"""Layout for the hover thumbnails. Builds the sprite lazily (off the event
loop) the first time it's asked for, then reuses it from memory."""
from fastapi import Response
import json as _json
video_path = _scrub_video_path(v)
if not video_path or not os.path.exists(video_path):
return Response(content='{"available": false}', media_type="application/json")
if video_path not in _scrub_cache:
loop = asyncio.get_event_loop()
_scrub_cache[video_path] = await loop.run_in_executor(None, _build_scrub_sprite, video_path)
built = _scrub_cache.get(video_path)
if not built:
return Response(content='{"available": false}', media_type="application/json")
meta = dict(built["meta"])
meta["sprite"] = f"/scrub_sprite?v={v if v is not None else 0}"
return Response(content=_json.dumps(meta), media_type="application/json")
@app.get("/scrub_sprite")
async def scrub_sprite(v: int | None = None):
from fastapi import Response, HTTPException
built = _scrub_cache.get(_scrub_video_path(v))
if not built:
raise HTTPException(status_code=404, detail="Not found")
return Response(content=built["jpeg"], media_type="image/jpeg")
def _origin_allowed(origin: str | None, host_header: str | None = None) -> bool:
"""Reject cross-site WebSocket hijacking while allowing localhost and LAN same-origin."""
if not origin: