feat(podcasts): add brief and transcript generation

This commit is contained in:
CREDO23 2026-06-10 18:44:03 +02:00
parent 0004abdc79
commit a3386cd5f9
19 changed files with 745 additions and 0 deletions

View file

@ -0,0 +1,20 @@
"""Generation: the LLM-driven brief and transcript controlled graphs.
Two small graphs hold all the intelligence: ``brief`` proposes a reviewable spec
(language detection + resolution), and ``transcript`` drafts long-form dialogue
outline-first. Everything else in the podcast pipeline is deterministic.
"""
from __future__ import annotations
from .brief import BriefConfig, BriefState, build_brief_graph
from .transcript import TranscriptConfig, TranscriptState, build_transcript_graph
__all__ = [
"BriefConfig",
"BriefState",
"TranscriptConfig",
"TranscriptState",
"build_brief_graph",
"build_transcript_graph",
]

View file

@ -0,0 +1,9 @@
"""Brief planning: propose a reviewable spec from weak signals."""
from __future__ import annotations
from .config import BriefConfig
from .graph import build_brief_graph
from .state import BriefState
__all__ = ["BriefConfig", "BriefState", "build_brief_graph"]

View file

@ -0,0 +1,31 @@
"""Configurable inputs for the brief-planning graph."""
from __future__ import annotations
from dataclasses import dataclass, field, fields
from langchain_core.runnables import RunnableConfig
# Sensible defaults for a fresh brief; the user adjusts the range at the gate.
DEFAULT_SPEAKER_COUNT = 2
DEFAULT_MIN_MINUTES = 10
DEFAULT_MAX_MINUTES = 20
@dataclass(kw_only=True)
class BriefConfig:
"""Signals used to propose a brief; everything here is non-LLM context."""
search_space_id: int
speaker_count: int = DEFAULT_SPEAKER_COUNT
min_minutes: int = DEFAULT_MIN_MINUTES
max_minutes: int = DEFAULT_MAX_MINUTES
focus: str | None = None
last_used_language: str | None = None
last_used_voices: list[str] = field(default_factory=list)
@classmethod
def from_runnable_config(cls, config: RunnableConfig | None = None) -> BriefConfig:
configurable = (config.get("configurable") or {}) if config else {}
names = {f.name for f in fields(cls) if f.init}
return cls(**{k: v for k, v in configurable.items() if k in names})

View file

@ -0,0 +1,28 @@
"""The language-detection reply shape, normalised to a safe tag or ``None``."""
from __future__ import annotations
from pydantic import BaseModel, field_validator
from app.podcasts.schemas import normalize_language_tag
class DetectedLanguage(BaseModel):
"""What the detector returns: a usable BCP-47 tag, or ``None`` when unsure.
A malformed or non-language reply is coerced to ``None`` so a bad detection
quietly defers to the rest of the resolution chain rather than poisoning the
spec with an invalid tag.
"""
language: str | None = None
@field_validator("language")
@classmethod
def _normalise(cls, value: str | None) -> str | None:
if value is None:
return None
try:
return normalize_language_tag(value)
except ValueError:
return None

View file

@ -0,0 +1,27 @@
"""The brief-planning graph: detect language, then propose a spec."""
from __future__ import annotations
from langgraph.graph import StateGraph
from .config import BriefConfig
from .nodes import detect_language, propose_spec
from .state import BriefState
def build_brief_graph():
workflow = StateGraph(BriefState, config_schema=BriefConfig)
workflow.add_node("detect_language", detect_language)
workflow.add_node("propose_spec", propose_spec)
workflow.add_edge("__start__", "detect_language")
workflow.add_edge("detect_language", "propose_spec")
workflow.add_edge("propose_spec", "__end__")
graph = workflow.compile()
graph.name = "Surfsense Podcast Brief"
return graph
graph = build_brief_graph()

View file

@ -0,0 +1,153 @@
"""Brief-planning nodes: detect the language, then propose a full spec.
Only ``detect_language`` spends tokens, and only a small sample of source text;
``propose_spec`` is pure resolution. Together they open the brief gate pre-filled
so the common case needs no edits.
"""
from __future__ import annotations
from typing import Any
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.runnables import RunnableConfig
from app.config import config as app_config
from app.podcasts.resolution import (
DEFAULT_LANGUAGE,
LanguageContext,
resolve_language,
resolve_voices,
)
from app.podcasts.schemas import (
DurationTarget,
PodcastSpec,
PodcastStyle,
SpeakerRole,
SpeakerSpec,
normalize_language_tag,
)
from app.podcasts.voices import (
VoiceCatalog,
TtsProvider,
get_voice_catalog,
provider_from_service,
)
from app.services.llm_service import get_agent_llm
from ..prompts import detect_language_prompt
from ..structured import StructuredOutputError, invoke_json
from .config import BriefConfig
from .detection import DetectedLanguage
from .state import BriefState
# Only the head of the source is needed to judge language; this caps tokens.
_DETECTION_SAMPLE_CHARS = 4000
# Default role per speaker slot; extra speakers beyond the list fall back to guest.
_ROLE_BY_SLOT = (
SpeakerRole.HOST,
SpeakerRole.GUEST,
SpeakerRole.EXPERT,
SpeakerRole.COHOST,
SpeakerRole.NARRATOR,
)
async def detect_language(
state: BriefState, config: RunnableConfig
) -> dict[str, Any]:
"""Detect the source language; defer (``None``) on any uncertainty."""
brief = BriefConfig.from_runnable_config(config)
llm = await get_agent_llm(state.db_session, brief.search_space_id)
if llm is None:
return {"detected_language": None}
sample = (state.source_content or "")[:_DETECTION_SAMPLE_CHARS].strip()
if not sample:
return {"detected_language": None}
messages = [
SystemMessage(content=detect_language_prompt()),
HumanMessage(content=f"<source_content>{sample}</source_content>"),
]
try:
detected = await invoke_json(llm, messages, DetectedLanguage)
except StructuredOutputError:
return {"detected_language": None}
return {"detected_language": detected.language}
def propose_spec(state: BriefState, config: RunnableConfig) -> dict[str, Any]:
"""Build a complete :class:`PodcastSpec` from the resolved defaults."""
brief = BriefConfig.from_runnable_config(config)
provider = _active_provider()
catalog = get_voice_catalog()
language = _supported_language(
detected=state.detected_language,
last_used=brief.last_used_language,
provider=provider,
catalog=catalog,
)
voices = resolve_voices(
catalog=catalog,
provider=provider,
language=language,
speaker_count=brief.speaker_count,
preferred=brief.last_used_voices,
)
speakers = [
SpeakerSpec(
slot=slot,
name=_default_name(slot),
role=_role_for(slot),
voice_id=voice.voice_id,
)
for slot, voice in enumerate(voices)
]
spec = PodcastSpec(
language=language,
style=PodcastStyle.CONVERSATIONAL,
speakers=speakers,
duration=DurationTarget(
min_minutes=brief.min_minutes, max_minutes=brief.max_minutes
),
focus=brief.focus,
)
return {"spec": spec}
def _active_provider() -> TtsProvider:
service = app_config.TTS_SERVICE
if not service:
raise ValueError("TTS_SERVICE is not configured")
return provider_from_service(service)
def _supported_language(
*,
detected: str | None,
last_used: str | None,
provider: TtsProvider,
catalog: VoiceCatalog,
) -> str:
raw = resolve_language(LanguageContext(detected=detected, last_used=last_used))
try:
language = normalize_language_tag(raw)
except ValueError:
language = DEFAULT_LANGUAGE
if not catalog.supports_language(provider, language):
return DEFAULT_LANGUAGE
return language
def _role_for(slot: int) -> SpeakerRole:
return _ROLE_BY_SLOT[slot] if slot < len(_ROLE_BY_SLOT) else SpeakerRole.GUEST
def _default_name(slot: int) -> str:
role = _role_for(slot)
label = role.value.replace("cohost", "co-host").title()
return label if slot < len(_ROLE_BY_SLOT) else f"{label} {slot}"

View file

@ -0,0 +1,19 @@
"""Mutable state threaded through the brief-planning graph."""
from __future__ import annotations
from dataclasses import dataclass
from sqlalchemy.ext.asyncio import AsyncSession
from app.podcasts.schemas import PodcastSpec
@dataclass
class BriefState:
"""Runtime inputs and the proposed spec the graph produces."""
db_session: AsyncSession
source_content: str
detected_language: str | None = None
spec: PodcastSpec | None = None

View file

@ -0,0 +1,15 @@
"""Prompt builders for the generation graphs."""
from __future__ import annotations
from .detect_language import detect_language_prompt
from .draft_segment import draft_segment_prompt
from .plan_outline import plan_outline_prompt
from .speakers import render_speaker_roster
__all__ = [
"detect_language_prompt",
"draft_segment_prompt",
"plan_outline_prompt",
"render_speaker_roster",
]

View file

@ -0,0 +1,22 @@
"""Prompt for detecting the dominant natural language of source content."""
from __future__ import annotations
_SYSTEM = """\
You identify the dominant natural language of a piece of source content for a \
podcast that will be generated from it.
Rules:
- Report the language the listener-facing podcast should be spoken in, i.e. the \
language most of the meaningful prose is written in.
- Ignore code, markup, URLs, numbers, and proper nouns when judging.
- If the content is too short, ambiguous, mixed without a clear majority, or not \
natural-language prose, return null rather than guessing.
Respond with strict JSON and nothing else:
{"language": "<BCP-47 tag like en, en-US, fr, pt-BR>"} or {"language": null}
"""
def detect_language_prompt() -> str:
return _SYSTEM

View file

@ -0,0 +1,54 @@
"""Prompt for drafting one outline segment into dialogue turns.
Each segment is drafted on its own so long episodes stay coherent and within
context limits. A short recap of the preceding dialogue is passed in so the new
segment continues naturally instead of restarting. The model must write in the
episode language and attribute every line to a real speaker slot.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from app.podcasts.schemas import PodcastSpec
from .speakers import render_speaker_roster
if TYPE_CHECKING:
from app.podcasts.generation.transcript.planning import OutlineSegment
def draft_segment_prompt(
*,
spec: PodcastSpec,
segment: OutlineSegment,
position: int,
total: int,
recap: str | None,
) -> str:
talking_points = "\n".join(f"- {point}" for point in segment.talking_points)
recap_block = (
f"\nRecap of the conversation so far (continue from here, do not repeat "
f"it):\n{recap}\n"
if recap
else "\nThis is the opening segment; begin the conversation naturally.\n"
)
return f"""\
You are scripting natural, engaging podcast dialogue for segment {position} of \
{total}.
Write entirely in {spec.language}. The format is {spec.style.value}.
Speakers attribute every line using these exact slot numbers:
{render_speaker_roster(spec)}
{recap_block}
This segment is "{segment.title}". Cover these points using only facts grounded \
in the provided source content:
{talking_points}
Aim for about {segment.target_words} words of dialogue. Keep turns conversational \
and varied; speakers should react to each other rather than deliver monologues. \
Do not add greetings or sign-offs unless this is the first or last segment.
Respond with strict JSON and nothing else:
{{"turns": [{{"speaker": <slot>, "text": "..."}}]}}
"""

View file

@ -0,0 +1,47 @@
"""Prompt for planning a long-form podcast outline before drafting dialogue.
Outlining first is what makes long-form reliable: a single LLM call cannot hold
a coherent one- to two-hour script, but it can plan segments that are then
drafted independently against a shared plan. The prompt is told the target
length so the number and size of segments scale with the requested duration.
"""
from __future__ import annotations
from app.podcasts.schemas import PodcastSpec
from .speakers import render_speaker_roster
def plan_outline_prompt(
*,
spec: PodcastSpec,
target_words: int,
suggested_segments: int,
focus: str | None,
) -> str:
focus_block = (
f"\nThe user asked the episode to focus on:\n{focus}\n" if focus else ""
)
return f"""\
You are a podcast showrunner planning the structure of an episode before any \
dialogue is written.
The episode language is {spec.language}. The format is {spec.style.value}.
Speakers (refer to them by these slots later):
{render_speaker_roster(spec)}
{focus_block}
Plan an outline that, when fully drafted, reaches roughly {target_words} words \
of spoken dialogue (about {suggested_segments} segments). Each segment is one \
coherent beat of the conversation: an opening, distinct topic areas grounded in \
the source content, and a closing.
For each segment provide:
- title: a short label for the beat
- talking_points: 2-5 concrete points to cover, drawn from the source content
- target_words: how many words of dialogue this segment should run (the sum \
across segments should approximate {target_words})
Respond with strict JSON and nothing else:
{{"segments": [{{"title": "...", "talking_points": ["..."], "target_words": 0}}]}}
"""

View file

@ -0,0 +1,18 @@
"""Render a spec's speaker roster for prompts.
The drafting prompts must reference speakers by the exact ``slot`` the renderer
expects, so this is the single place that formats that roster keeping the
slot contract identical across every prompt that mentions speakers.
"""
from __future__ import annotations
from app.podcasts.schemas import PodcastSpec
def render_speaker_roster(spec: PodcastSpec) -> str:
lines = [
f"- slot {speaker.slot}{speaker.name} (role: {speaker.role.value})"
for speaker in spec.speakers
]
return "\n".join(lines)

View file

@ -0,0 +1,49 @@
"""Parse a model's reply into a Pydantic shape, tolerating chatty output.
Agent LLMs return JSON wrapped in prose, markdown fences, or reasoning blocks.
This mirrors the legacy podcaster's resilient parsing — strip fences, then fall
back to the outermost ``{...}`` span so every generation node validates the
reply the same way instead of repeating ad-hoc parsing.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, TypeVar
from pydantic import BaseModel, ValidationError
from app.utils.content_utils import extract_text_content, strip_markdown_fences
if TYPE_CHECKING:
from langchain_core.messages import BaseMessage
T = TypeVar("T", bound=BaseModel)
class StructuredOutputError(RuntimeError):
"""The model reply could not be parsed into the expected shape."""
async def invoke_json(llm, messages: list[BaseMessage], model: type[T]) -> T:
"""Invoke ``llm`` and validate its reply as ``model``."""
response = await llm.ainvoke(messages)
content = strip_markdown_fences(extract_text_content(response.content))
try:
return model.model_validate_json(content)
except (ValidationError, ValueError):
pass
start = content.find("{")
end = content.rfind("}") + 1
if 0 <= start < end:
try:
return model.model_validate_json(content[start:end])
except (ValidationError, ValueError) as exc:
raise StructuredOutputError(
f"could not parse {model.__name__} from model reply"
) from exc
raise StructuredOutputError(
f"no JSON object found for {model.__name__} in model reply"
)

View file

@ -0,0 +1,17 @@
"""Transcript drafting: outline-first, long-form dialogue generation."""
from __future__ import annotations
from .config import TranscriptConfig
from .graph import build_transcript_graph
from .planning import Outline, OutlineSegment, SegmentDraft
from .state import TranscriptState
__all__ = [
"Outline",
"OutlineSegment",
"SegmentDraft",
"TranscriptConfig",
"TranscriptState",
"build_transcript_graph",
]

View file

@ -0,0 +1,26 @@
"""Configurable inputs for the transcript-drafting graph."""
from __future__ import annotations
from dataclasses import dataclass, fields
from langchain_core.runnables import RunnableConfig
from app.podcasts.schemas import PodcastSpec
@dataclass(kw_only=True)
class TranscriptConfig:
"""The approved spec and user focus that drive drafting."""
search_space_id: int
spec: PodcastSpec
focus: str | None = None
@classmethod
def from_runnable_config(
cls, config: RunnableConfig | None = None
) -> TranscriptConfig:
configurable = (config.get("configurable") or {}) if config else {}
names = {f.name for f in fields(cls) if f.init}
return cls(**{k: v for k, v in configurable.items() if k in names})

View file

@ -0,0 +1,29 @@
"""The transcript-drafting graph: outline, draft segments, finalize."""
from __future__ import annotations
from langgraph.graph import StateGraph
from .config import TranscriptConfig
from .nodes import draft_segments, finalize, plan_outline
from .state import TranscriptState
def build_transcript_graph():
workflow = StateGraph(TranscriptState, config_schema=TranscriptConfig)
workflow.add_node("plan_outline", plan_outline)
workflow.add_node("draft_segments", draft_segments)
workflow.add_node("finalize", finalize)
workflow.add_edge("__start__", "plan_outline")
workflow.add_edge("plan_outline", "draft_segments")
workflow.add_edge("draft_segments", "finalize")
workflow.add_edge("finalize", "__end__")
graph = workflow.compile()
graph.name = "Surfsense Podcast Transcript"
return graph
graph = build_transcript_graph()

View file

@ -0,0 +1,127 @@
"""Transcript-drafting nodes: plan an outline, draft each beat, then assemble.
Long-form is produced beat-by-beat: a single call plans the structure, then each
segment is drafted on its own with a recap of what came before so the script
stays coherent without holding the whole episode in one context window.
"""
from __future__ import annotations
from typing import Any
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.runnables import RunnableConfig
from app.podcasts.schemas import PodcastSpec, Transcript, TranscriptTurn
from app.services.llm_service import get_agent_llm
from ..prompts import draft_segment_prompt, plan_outline_prompt
from ..structured import invoke_json
from .config import TranscriptConfig
from .planning import Outline, OutlineSegment, SegmentDraft
from .state import TranscriptState
# Average speaking rate; converts target minutes to a target word count.
_WORDS_PER_MINUTE = 150
# Rough words per outline segment, used to suggest how many segments to plan.
_WORDS_PER_SEGMENT = 250
# Cap on source text sent per LLM call to bound tokens on large sources.
_SOURCE_BUDGET_CHARS = 12000
# How much prior dialogue to recap into each segment for continuity.
_RECAP_CHARS = 800
async def plan_outline(
state: TranscriptState, config: RunnableConfig
) -> dict[str, Any]:
"""Plan the segment structure sized to the spec's target duration."""
tc = TranscriptConfig.from_runnable_config(config)
llm = await _require_llm(state, tc)
target_words = round(tc.spec.duration.midpoint_minutes * _WORDS_PER_MINUTE)
suggested_segments = max(1, round(target_words / _WORDS_PER_SEGMENT))
messages = [
SystemMessage(
content=plan_outline_prompt(
spec=tc.spec,
target_words=target_words,
suggested_segments=suggested_segments,
focus=tc.focus,
)
),
HumanMessage(content=_source_block(state.source_content)),
]
outline = await invoke_json(llm, messages, Outline)
return {"outline": outline}
async def draft_segments(
state: TranscriptState, config: RunnableConfig
) -> dict[str, Any]:
"""Draft each outline segment in order, carrying a running recap."""
tc = TranscriptConfig.from_runnable_config(config)
llm = await _require_llm(state, tc)
outline = state.outline
if outline is None:
raise RuntimeError("draft_segments requires an outline")
source_block = _source_block(state.source_content)
turns: list[TranscriptTurn] = []
total = len(outline.segments)
for index, segment in enumerate(outline.segments):
messages = [
SystemMessage(
content=draft_segment_prompt(
spec=tc.spec,
segment=segment,
position=index + 1,
total=total,
recap=_recap(turns, tc.spec),
)
),
HumanMessage(content=source_block),
]
draft = await invoke_json(llm, messages, SegmentDraft)
turns.extend(_valid_turns(draft, tc.spec))
return {"drafted_turns": turns}
def finalize(state: TranscriptState, config: RunnableConfig) -> dict[str, Any]:
"""Assemble drafted turns into a validated transcript."""
if not state.drafted_turns:
raise RuntimeError("drafting produced no usable dialogue")
return {"transcript": Transcript(turns=state.drafted_turns)}
async def _require_llm(state: TranscriptState, tc: TranscriptConfig):
llm = await get_agent_llm(state.db_session, tc.search_space_id)
if llm is None:
raise RuntimeError(
f"no agent LLM configured for search space {tc.search_space_id}"
)
return llm
def _source_block(source_content: str) -> str:
sample = (source_content or "")[:_SOURCE_BUDGET_CHARS]
return f"<source_content>{sample}</source_content>"
def _valid_turns(draft: SegmentDraft, spec: PodcastSpec) -> list[TranscriptTurn]:
# Drop any turn the model attributed to a slot the spec doesn't define, so a
# stray attribution can't break rendering downstream.
valid_slots = {speaker.slot for speaker in spec.speakers}
return [turn for turn in draft.turns if turn.speaker in valid_slots]
def _recap(turns: list[TranscriptTurn], spec: PodcastSpec) -> str | None:
if not turns:
return None
names = {speaker.slot: speaker.name for speaker in spec.speakers}
rendered = "\n".join(
f"{names.get(turn.speaker, turn.speaker)}: {turn.text}" for turn in turns
)
return rendered[-_RECAP_CHARS:]

View file

@ -0,0 +1,32 @@
"""Internal shapes the transcript graph passes between its nodes.
These are generation-time artifacts (the outline and per-segment drafts), not
persisted or API-facing. Segment drafts reuse :class:`TranscriptTurn` so the
speaker-slot contract and turn validation are identical to the final transcript.
"""
from __future__ import annotations
from pydantic import BaseModel, Field
from app.podcasts.schemas import TranscriptTurn
class OutlineSegment(BaseModel):
"""One planned beat of the conversation, drafted independently."""
title: str = Field(..., min_length=1)
talking_points: list[str] = Field(default_factory=list)
target_words: int = Field(..., ge=1)
class Outline(BaseModel):
"""The full plan: ordered segments sized to the target duration."""
segments: list[OutlineSegment] = Field(..., min_length=1)
class SegmentDraft(BaseModel):
"""The dialogue a single segment produced."""
turns: list[TranscriptTurn] = Field(default_factory=list)

View file

@ -0,0 +1,22 @@
"""Mutable state threaded through the transcript-drafting graph."""
from __future__ import annotations
from dataclasses import dataclass, field
from sqlalchemy.ext.asyncio import AsyncSession
from app.podcasts.schemas import Transcript, TranscriptTurn
from .planning import Outline
@dataclass
class TranscriptState:
"""Source content plus the intermediate and final drafting artifacts."""
db_session: AsyncSession
source_content: str
outline: Outline | None = None
drafted_turns: list[TranscriptTurn] = field(default_factory=list)
transcript: Transcript | None = None