From 0004abdc79f6471aaed1e0f979b14e4946cc3cc8 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Wed, 10 Jun 2026 18:44:03 +0200 Subject: [PATCH] feat(podcasts): add audio renderer --- .../app/podcasts/rendering/__init__.py | 12 ++ .../app/podcasts/rendering/cache.py | 53 ++++++ .../app/podcasts/rendering/errors.py | 11 ++ .../app/podcasts/rendering/merge.py | 48 ++++++ .../app/podcasts/rendering/renderer.py | 157 ++++++++++++++++++ 5 files changed, 281 insertions(+) create mode 100644 surfsense_backend/app/podcasts/rendering/__init__.py create mode 100644 surfsense_backend/app/podcasts/rendering/cache.py create mode 100644 surfsense_backend/app/podcasts/rendering/errors.py create mode 100644 surfsense_backend/app/podcasts/rendering/merge.py create mode 100644 surfsense_backend/app/podcasts/rendering/renderer.py diff --git a/surfsense_backend/app/podcasts/rendering/__init__.py b/surfsense_backend/app/podcasts/rendering/__init__.py new file mode 100644 index 000000000..9fb50a2e1 --- /dev/null +++ b/surfsense_backend/app/podcasts/rendering/__init__.py @@ -0,0 +1,12 @@ +"""Rendering: synthesise and merge an approved transcript into audio. + +The :class:`PodcastRenderer` is the public entry point; the segment cache and +FFmpeg merge are implementation details it owns. +""" + +from __future__ import annotations + +from .errors import RenderError +from .renderer import PodcastRenderer, RenderedPodcast + +__all__ = ["PodcastRenderer", "RenderError", "RenderedPodcast"] diff --git a/surfsense_backend/app/podcasts/rendering/cache.py b/surfsense_backend/app/podcasts/rendering/cache.py new file mode 100644 index 000000000..32d9f0c21 --- /dev/null +++ b/surfsense_backend/app/podcasts/rendering/cache.py @@ -0,0 +1,53 @@ +"""Content-addressed cache for synthesised segments. + +Each segment's audio is keyed by everything that determines its bytes (voice, +language, speed, text). Keeping the cache in a stable per-podcast directory +makes re-renders cheap: changing one speaker's voice only misses that speaker's +turns, and a worker restart mid-render resumes from whatever was already +written. The key intentionally excludes the segment's position so identical +lines (e.g. repeated "Right.") synthesise once. +""" + +from __future__ import annotations + +import hashlib +import json +from pathlib import Path + +from app.podcasts.tts import SynthesisRequest + + +class SegmentCache: + """On-disk store of segment audio, addressed by request content hash.""" + + def __init__(self, root: Path) -> None: + self._root = root + self._root.mkdir(parents=True, exist_ok=True) + + def key(self, request: SynthesisRequest) -> str: + """A stable hash of the inputs that determine the synthesised bytes.""" + material = json.dumps( + { + "voice": request.voice, + "language": request.language, + "speed": request.speed, + "text": request.text, + }, + sort_keys=True, + ensure_ascii=True, + ) + return hashlib.sha256(material.encode("utf-8")).hexdigest() + + def path(self, key: str, container: str) -> Path: + return self._root / f"{key}.{container}" + + def get(self, key: str, container: str) -> Path | None: + """Return the cached segment path, or ``None`` on a miss.""" + path = self.path(key, container) + return path if path.exists() else None + + def put(self, key: str, container: str, data: bytes) -> Path: + """Write ``data`` for ``key`` and return its path.""" + path = self.path(key, container) + path.write_bytes(data) + return path diff --git a/surfsense_backend/app/podcasts/rendering/errors.py b/surfsense_backend/app/podcasts/rendering/errors.py new file mode 100644 index 000000000..7192890c6 --- /dev/null +++ b/surfsense_backend/app/podcasts/rendering/errors.py @@ -0,0 +1,11 @@ +"""Failures raised while rendering a transcript to audio.""" + +from __future__ import annotations + + +class RenderError(RuntimeError): + """Rendering could not produce a final audio file. + + Wraps both per-segment synthesis failures and the merge step so the render + task sees one failure type regardless of where it originated. + """ diff --git a/surfsense_backend/app/podcasts/rendering/merge.py b/surfsense_backend/app/podcasts/rendering/merge.py new file mode 100644 index 000000000..48771d17c --- /dev/null +++ b/surfsense_backend/app/podcasts/rendering/merge.py @@ -0,0 +1,48 @@ +"""Concatenate ordered segment files into a single MP3. + +Uses FFmpeg's concat *demuxer* (a list file of inputs) rather than a +``filter_complex`` graph. The demuxer takes one ``-i`` no matter how many +segments there are, so an hour-long episode with thousands of turns never hits +command-line length limits. Output is always re-encoded to MP3 for a uniform +artifact regardless of the source container (Kokoro WAV or hosted MP3). +""" + +from __future__ import annotations + +from pathlib import Path + +from ffmpeg.asyncio import FFmpeg + +from .errors import RenderError + + +async def concat_to_mp3(segment_paths: list[Path], output_path: Path) -> None: + """Merge ``segment_paths`` in order into ``output_path`` as MP3.""" + if not segment_paths: + raise RenderError("cannot merge an empty list of segments") + + list_file = output_path.with_name(f"{output_path.stem}.concat.txt") + list_file.write_text(_concat_list(segment_paths), encoding="utf-8") + + try: + ffmpeg = ( + FFmpeg() + .option("y") + .input(str(list_file), f="concat", safe=0) + .output(str(output_path), {"c:a": "libmp3lame"}) + ) + await ffmpeg.execute() + except Exception as exc: # noqa: BLE001 - normalise ffmpeg failures + raise RenderError(f"audio merge failed: {exc}") from exc + finally: + list_file.unlink(missing_ok=True) + + +def _concat_list(segment_paths: list[Path]) -> str: + # The concat demuxer reads `file ''` lines; single quotes in a path + # are escaped per its quoting rules ('\''). + lines = [] + for path in segment_paths: + escaped = str(path.resolve()).replace("'", "'\\''") + lines.append(f"file '{escaped}'") + return "\n".join(lines) + "\n" diff --git a/surfsense_backend/app/podcasts/rendering/renderer.py b/surfsense_backend/app/podcasts/rendering/renderer.py new file mode 100644 index 000000000..89a4e6b7d --- /dev/null +++ b/surfsense_backend/app/podcasts/rendering/renderer.py @@ -0,0 +1,157 @@ +"""Render an approved transcript into a single podcast audio file. + +The renderer is the only place that turns dialogue into sound. It maps each +turn to its speaker's voice, synthesises segments concurrently (capped, served +from the segment cache when possible, and coalesced so identical lines render +once), then merges them in order. It takes a settled spec + transcript and +returns bytes; persistence and lifecycle transitions belong to the service. +""" + +from __future__ import annotations + +import asyncio +from dataclasses import dataclass +from pathlib import Path + +from app.podcasts.schemas import PodcastSpec, Transcript, TranscriptTurn +from app.podcasts.tts import SynthesisRequest, TextToSpeech, TextToSpeechError +from app.podcasts.voices import VoiceCatalog + +from .cache import SegmentCache +from .errors import RenderError +from .merge import concat_to_mp3 + +# Bounds how many segments synthesise at once. Protects hosted-provider rate +# limits and avoids thrashing the local Kokoro pipeline; the renderer is I/O- or +# model-bound per segment, so a small pool already saturates throughput. +DEFAULT_MAX_CONCURRENCY = 4 + +_MERGED_FILENAME = "podcast.mp3" + + +@dataclass(frozen=True, slots=True) +class RenderedPodcast: + """The finished episode: encoded bytes plus their container.""" + + data: bytes + container: str + + +class PodcastRenderer: + """Synthesises and merges a transcript using one TTS provider.""" + + def __init__( + self, + *, + tts: TextToSpeech, + catalog: VoiceCatalog, + max_concurrency: int = DEFAULT_MAX_CONCURRENCY, + ) -> None: + self._tts = tts + self._catalog = catalog + self._max_concurrency = max_concurrency + + async def render( + self, + *, + spec: PodcastSpec, + transcript: Transcript, + workdir: Path, + ) -> RenderedPodcast: + """Produce the merged MP3 for ``transcript`` under ``spec``. + + ``workdir`` holds the segment cache and merge output; reusing the same + directory across renders is what makes voice edits cheap. + """ + cache = SegmentCache(workdir / "segments") + requests = [self._request_for(spec, turn) for turn in transcript.turns] + + # Concurrency primitives are created per render so each call is bound to + # the event loop running it (Celery tasks may use a fresh loop). + synthesizer = _SegmentSynthesizer(self._tts, cache, self._max_concurrency) + segment_paths = await asyncio.gather( + *(synthesizer.segment(request) for request in requests) + ) + + output_path = workdir / _MERGED_FILENAME + await concat_to_mp3(list(segment_paths), output_path) + return RenderedPodcast(data=output_path.read_bytes(), container="mp3") + + def _request_for( + self, spec: PodcastSpec, turn: TranscriptTurn + ) -> SynthesisRequest: + try: + speaker = spec.speaker_for(turn.speaker) + except KeyError as exc: + raise RenderError( + f"transcript references unknown speaker slot {turn.speaker}" + ) from exc + try: + voice = self._catalog.get(speaker.voice_id) + except KeyError as exc: + raise RenderError(f"unknown voice {speaker.voice_id!r}") from exc + return SynthesisRequest( + text=turn.text, voice=voice.native_ref, language=spec.language + ) + + +class _SegmentSynthesizer: + """Per-render synthesis coordinator: caps concurrency and dedupes work. + + Beyond the on-disk cache (which serves cross-render reuse), this coalesces + identical segments that race within one render so the same line is voiced + once even when several turns request it simultaneously. + """ + + def __init__( + self, tts: TextToSpeech, cache: SegmentCache, max_concurrency: int + ) -> None: + self._tts = tts + self._cache = cache + self._container = tts.container + self._semaphore = asyncio.Semaphore(max_concurrency) + self._inflight: dict[str, asyncio.Future[Path]] = {} + self._inflight_lock = asyncio.Lock() + + async def segment(self, request: SynthesisRequest) -> Path: + key = self._cache.key(request) + cached = self._cache.get(key, self._container) + if cached is not None: + return cached + + async with self._inflight_lock: + future = self._inflight.get(key) + owner = future is None + if owner: + future = asyncio.get_event_loop().create_future() + self._inflight[key] = future + + # The owner runs the work and publishes the outcome on the shared future; + # every caller (owner included) reads it back via ``await future`` so the + # result is retrieved exactly once-or-more and never left dangling. + if owner: + try: + path = await self._synthesize(request, key) + except BaseException as exc: # noqa: BLE001 - relayed to all waiters + future.set_exception(exc) + else: + future.set_result(path) + finally: + await self._forget(key) + + return await future + + async def _synthesize(self, request: SynthesisRequest, key: str) -> Path: + async with self._semaphore: + cached = self._cache.get(key, self._container) + if cached is not None: + return cached + try: + audio = await self._tts.synthesize(request) + except TextToSpeechError as exc: + raise RenderError(f"segment synthesis failed: {exc}") from exc + return self._cache.put(key, audio.container, audio.data) + + async def _forget(self, key: str) -> None: + async with self._inflight_lock: + self._inflight.pop(key, None)