refactor(podcasts): remove legacy podcaster agent, task, and schema

This commit is contained in:
CREDO23 2026-06-10 21:45:04 +02:00
parent 003d1d2b95
commit 97ab7a88fd
12 changed files with 2 additions and 821 deletions

View file

@ -1,8 +0,0 @@
"""New LangGraph Agent.
This module defines a custom graph.
"""
from .graph import graph
__all__ = ["graph"]

View file

@ -1,29 +0,0 @@
"""Define the configurable parameters for the agent."""
from __future__ import annotations
from dataclasses import dataclass, fields
from langchain_core.runnables import RunnableConfig
@dataclass(kw_only=True)
class Configuration:
"""The configuration for the agent."""
# Changeme: Add configurable values here!
# these values can be pre-set when you
# create assistants (https://langchain-ai.github.io/langgraph/cloud/how-tos/configuration_cloud/)
# and when you invoke the graph
podcast_title: str
search_space_id: int
user_prompt: str | None = None
@classmethod
def from_runnable_config(
cls, config: RunnableConfig | None = None
) -> Configuration:
"""Create a Configuration instance from a RunnableConfig object."""
configurable = (config.get("configurable") or {}) if config else {}
_fields = {f.name for f in fields(cls) if f.init}
return cls(**{k: v for k, v in configurable.items() if k in _fields})

View file

@ -1,29 +0,0 @@
from langgraph.graph import StateGraph
from .configuration import Configuration
from .nodes import create_merged_podcast_audio, create_podcast_transcript
from .state import State
def build_graph():
# Define a new graph
workflow = StateGraph(State, config_schema=Configuration)
# Add the node to the graph
workflow.add_node("create_podcast_transcript", create_podcast_transcript)
workflow.add_node("create_merged_podcast_audio", create_merged_podcast_audio)
# Set the entrypoint as `call_model`
workflow.add_edge("__start__", "create_podcast_transcript")
workflow.add_edge("create_podcast_transcript", "create_merged_podcast_audio")
workflow.add_edge("create_merged_podcast_audio", "__end__")
# Compile the workflow into an executable graph
graph = workflow.compile()
graph.name = "Surfsense Podcaster" # This defines the custom name in LangSmith
return graph
# Compile the graph once when the module is loaded
graph = build_graph()

View file

@ -1,195 +0,0 @@
import asyncio
import json
import os
import uuid
from pathlib import Path
from typing import Any
from ffmpeg.asyncio import FFmpeg
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.runnables import RunnableConfig
from litellm import aspeech
from app.config import config as app_config
from app.services.kokoro_tts_service import get_kokoro_tts_service
from app.services.llm_service import get_agent_llm
from app.utils.content_utils import extract_text_content, strip_markdown_fences
from .configuration import Configuration
from .prompts import get_podcast_generation_prompt
from .state import PodcastTranscriptEntry, PodcastTranscripts, State
from .utils import get_voice_for_provider
async def create_podcast_transcript(
state: State, config: RunnableConfig
) -> dict[str, Any]:
"""Generate the podcast transcript from the source content."""
configuration = Configuration.from_runnable_config(config)
search_space_id = configuration.search_space_id
user_prompt = configuration.user_prompt
llm = await get_agent_llm(state.db_session, search_space_id)
if not llm:
error_message = f"No agent LLM configured for search space {search_space_id}"
print(error_message)
raise RuntimeError(error_message)
prompt = get_podcast_generation_prompt(user_prompt)
messages = [
SystemMessage(content=prompt),
HumanMessage(
content=f"<source_content>{state.source_content}</source_content>"
),
]
llm_response = await llm.ainvoke(messages)
# Reasoning models may return content as blocks; normalise to a string.
content = strip_markdown_fences(extract_text_content(llm_response.content))
try:
podcast_transcript = PodcastTranscripts.model_validate(json.loads(content))
except (json.JSONDecodeError, TypeError, ValueError) as e:
print(f"Direct JSON parsing failed, trying fallback approach: {e!s}")
try:
json_start = content.find("{")
json_end = content.rfind("}") + 1
if json_start >= 0 and json_end > json_start:
json_str = content[json_start:json_end]
parsed_data = json.loads(json_str)
podcast_transcript = PodcastTranscripts.model_validate(parsed_data)
print("Successfully parsed podcast transcript using fallback approach")
else:
error_message = f"Could not find valid JSON in LLM response. Raw response: {content}"
print(error_message)
raise ValueError(error_message)
except (json.JSONDecodeError, TypeError, ValueError) as e2:
error_message = f"Error parsing LLM response (fallback also failed): {e2!s}"
print(f"Error parsing LLM response: {e2!s}")
print(f"Raw response: {content}")
raise
return {"podcast_transcript": podcast_transcript.podcast_transcripts}
async def create_merged_podcast_audio(
state: State, config: RunnableConfig
) -> dict[str, Any]:
"""Generate audio for each transcript and merge them into a single podcast file."""
starting_transcript = PodcastTranscriptEntry(
speaker_id=1, dialog="Welcome to Surfsense Podcast."
)
transcript = state.podcast_transcript
# transcript may be a PodcastTranscripts object or already a list.
if hasattr(transcript, "podcast_transcripts"):
transcript_entries = transcript.podcast_transcripts
else:
transcript_entries = transcript
merged_transcript = [starting_transcript, *transcript_entries]
temp_dir = Path("temp_audio")
temp_dir.mkdir(exist_ok=True)
session_id = str(uuid.uuid4())
output_path = f"podcasts/{session_id}_podcast.mp3"
os.makedirs("podcasts", exist_ok=True)
audio_files = []
async def generate_speech_for_segment(segment, index):
if hasattr(segment, "speaker_id"):
speaker_id = segment.speaker_id
dialog = segment.dialog
else:
speaker_id = segment.get("speaker_id", 0)
dialog = segment.get("dialog", "")
voice = get_voice_for_provider(app_config.TTS_SERVICE, speaker_id)
if app_config.TTS_SERVICE == "local/kokoro":
filename = f"{temp_dir}/{session_id}_{index}.wav"
else:
filename = f"{temp_dir}/{session_id}_{index}.mp3"
try:
if app_config.TTS_SERVICE == "local/kokoro":
kokoro_service = await get_kokoro_tts_service(
lang_code="a"
) # American English
audio_path = await kokoro_service.generate_speech(
text=dialog, voice=voice, speed=1.0, output_path=filename
)
return audio_path
else:
if app_config.TTS_SERVICE_API_BASE:
response = await aspeech(
model=app_config.TTS_SERVICE,
api_base=app_config.TTS_SERVICE_API_BASE,
api_key=app_config.TTS_SERVICE_API_KEY,
voice=voice,
input=dialog,
max_retries=2,
timeout=600,
)
else:
response = await aspeech(
model=app_config.TTS_SERVICE,
api_key=app_config.TTS_SERVICE_API_KEY,
voice=voice,
input=dialog,
max_retries=2,
timeout=600,
)
with open(filename, "wb") as f:
f.write(response.content)
return filename
except Exception as e:
print(f"Error generating speech for segment {index}: {e!s}")
raise
tasks = [
generate_speech_for_segment(segment, i)
for i, segment in enumerate(merged_transcript)
]
audio_files = await asyncio.gather(*tasks)
try:
ffmpeg = FFmpeg().option("y")
for audio_file in audio_files:
ffmpeg = ffmpeg.input(audio_file)
filter_complex = []
for i in range(len(audio_files)):
filter_complex.append(f"[{i}:0]")
filter_complex_str = (
"".join(filter_complex) + f"concat=n={len(audio_files)}:v=0:a=1[outa]"
)
ffmpeg = ffmpeg.option("filter_complex", filter_complex_str)
ffmpeg = ffmpeg.output(output_path, map="[outa]")
await ffmpeg.execute()
print(f"Successfully created podcast audio: {output_path}")
except Exception as e:
print(f"Error merging audio files: {e!s}")
raise
finally:
for audio_file in audio_files:
try:
os.remove(audio_file)
except Exception as e:
print(f"Error removing audio file {audio_file}: {e!s}")
pass
return {
"podcast_transcript": merged_transcript,
"final_podcast_file_path": output_path,
}

View file

@ -1,122 +0,0 @@
import datetime
def get_podcast_generation_prompt(user_prompt: str | None = None):
return f"""
Today's date: {datetime.datetime.now().strftime("%Y-%m-%d")}
<podcast_generation_system>
You are a master podcast scriptwriter, adept at transforming diverse input content into a lively, engaging, and natural-sounding conversation between two distinct podcast hosts. Your primary objective is to craft authentic, flowing dialogue that captures the spontaneity and chemistry of a real podcast discussion, completely avoiding any hint of robotic scripting or stiff formality. Think dynamic interplay, not just information delivery.
{
f'''
You **MUST** strictly adhere to the following user instruction while generating the podcast script:
<user_instruction>
{user_prompt}
</user_instruction>
'''
if user_prompt
else ""
}
<input>
- '<source_content>': A block of text containing the information to be discussed in the podcast. This could be research findings, an article summary, a detailed outline, user chat history related to the topic, or any other relevant raw information. The content might be unstructured but serves as the factual basis for the podcast dialogue.
</input>
<output_format>
A JSON object containing the podcast transcript with alternating speakers:
{{
"podcast_transcripts": [
{{
"speaker_id": 0,
"dialog": "Speaker 0 dialog here"
}},
{{
"speaker_id": 1,
"dialog": "Speaker 1 dialog here"
}},
{{
"speaker_id": 0,
"dialog": "Speaker 0 dialog here"
}},
{{
"speaker_id": 1,
"dialog": "Speaker 1 dialog here"
}}
]
}}
</output_format>
<guidelines>
1. **Establish Distinct & Consistent Host Personas:**
* **Speaker 0 (Lead Host):** Drives the conversation forward, introduces segments, poses key questions derived from the source content, and often summarizes takeaways. Maintain a guiding, clear, and engaging tone.
* **Speaker 1 (Co-Host/Expert):** Offers deeper insights, provides alternative viewpoints or elaborations on the source content, asks clarifying or challenging questions, and shares relevant anecdotes or examples. Adopt a complementary tone (e.g., analytical, enthusiastic, reflective, slightly skeptical).
* **Consistency is Key:** Ensure each speaker maintains their distinct voice, vocabulary choice, sentence structure, and perspective throughout the entire script. Avoid having them sound interchangeable. Their interaction should feel like a genuine partnership.
2. **Craft Natural & Dynamic Dialogue:**
* **Emulate Real Conversation:** Use contractions (e.g., "don't", "it's"), interjections ("Oh!", "Wow!", "Hmm"), discourse markers ("you know", "right?", "well"), and occasional natural pauses or filler words. Avoid overly formal language or complex sentence structures typical of written text.
* **Foster Interaction & Chemistry:** Write dialogue where speakers genuinely react *to each other*. They should build on points ("Exactly, and that reminds me..."), ask follow-up questions ("Could you expand on that?"), express agreement/disagreement respectfully ("That's a fair point, but have you considered...?"), and show active listening.
* **Vary Rhythm & Pace:** Mix short, punchy lines with longer, more explanatory ones. Vary sentence beginnings. Use questions to break up exposition. The rhythm should feel spontaneous, not monotonous.
* **Inject Personality & Relatability:** Allow for appropriate humor, moments of surprise or curiosity, brief personal reflections ("I actually experienced something similar..."), or relatable asides that fit the hosts' personas and the topic. Lightly reference past discussions if it enhances context ("Remember last week when we touched on...?").
3. **Structure for Flow and Listener Engagement:**
* **Natural Beginning:** Start with dialogue that flows naturally after an introduction (which will be added manually). Avoid redundant greetings or podcast name mentions since these will be added separately.
* **Logical Progression & Signposting:** Guide the listener through the information smoothly. Use clear transitions to link different ideas or segments ("So, now that we've covered X, let's dive into Y...", "That actually brings me to another key finding..."). Ensure topics flow logically from one to the next.
* **Meaningful Conclusion:** Summarize the key takeaways or main points discussed, reinforcing the core message derived from the source content. End with a final thought, a lingering question for the audience, or a brief teaser for what's next, providing a sense of closure. Avoid abrupt endings.
4. **Integrate Source Content Seamlessly & Accurately:**
* **Translate, Don't Recite:** Rephrase information from the `<source_content>` into conversational language suitable for each host's persona. Avoid directly copying dense sentences or technical jargon without explanation. The goal is discussion, not narration.
* **Explain & Contextualize:** Use analogies, simple examples, storytelling, or have one host ask clarifying questions (acting as a listener surrogate) to break down complex ideas from the source.
* **Weave Information Naturally:** Integrate facts, data, or key points from the source *within* the dialogue, not as standalone, undigested blocks. Attribute information conversationally where appropriate ("The research mentioned...", "Apparently, the key factor is...").
* **Balance Depth & Accessibility:** Ensure the conversation is informative and factually accurate based on the source content, but prioritize clear communication and engaging delivery over exhaustive technical detail. Make it understandable and interesting for a general audience.
5. **Length & Pacing:**
* **Six-Minute Duration:** Create a transcript that, when read at a natural speaking pace, would result in approximately 6 minutes of audio. Typically, this means around 1000 words total (based on average speaking rate of 150 words per minute).
* **Concise Speaking Turns:** Keep most speaking turns relatively brief and focused. Aim for a natural back-and-forth rhythm rather than extended monologues.
* **Essential Content Only:** Prioritize the most important information from the source content. Focus on quality over quantity, ensuring every line contributes meaningfully to the topic.
</guidelines>
<examples>
Input: "Quantum computing uses quantum bits or qubits which can exist in multiple states simultaneously due to superposition."
Output:
{{
"podcast_transcripts": [
{{
"speaker_id": 0,
"dialog": "Today we're diving into the mind-bending world of quantum computing. You know, this is a topic I've been excited to cover for weeks."
}},
{{
"speaker_id": 1,
"dialog": "Same here! And I know our listeners have been asking for it. But I have to admit, the concept of quantum computing makes my head spin a little. Can we start with the basics?"
}},
{{
"speaker_id": 0,
"dialog": "Absolutely. So regular computers use bits, right? Little on-off switches that are either 1 or 0. But quantum computers use something called qubits, and this is where it gets fascinating."
}},
{{
"speaker_id": 1,
"dialog": "Wait, what makes qubits so special compared to regular bits?"
}},
{{
"speaker_id": 0,
"dialog": "The magic is in something called superposition. These qubits can exist in multiple states at the same time, not just 1 or 0."
}},
{{
"speaker_id": 1,
"dialog": "That sounds impossible! How would you even picture that?"
}},
{{
"speaker_id": 0,
"dialog": "Think of it like a coin spinning in the air. Before it lands, is it heads or tails?"
}},
{{
"speaker_id": 1,
"dialog": "Well, it's... neither? Or I guess both, until it lands? Oh, I think I see where you're going with this."
}}
]
}}
</examples>
Transform the source material into a lively and engaging podcast conversation. Craft dialogue that showcases authentic host chemistry and natural interaction (including occasional disagreement, building on points, or asking follow-up questions). Use varied speech patterns reflecting real human conversation, ensuring the final script effectively educates *and* entertains the listener while keeping within a 5-minute audio duration.
</podcast_generation_system>
"""

View file

@ -1,43 +0,0 @@
"""Define the state structures for the agent."""
from __future__ import annotations
from dataclasses import dataclass
from pydantic import BaseModel, Field
from sqlalchemy.ext.asyncio import AsyncSession
class PodcastTranscriptEntry(BaseModel):
"""
Represents a single entry in a podcast transcript.
"""
speaker_id: int = Field(..., description="The ID of the speaker (0 or 1)")
dialog: str = Field(..., description="The dialog text spoken by the speaker")
class PodcastTranscripts(BaseModel):
"""
Represents the full podcast transcript structure.
"""
podcast_transcripts: list[PodcastTranscriptEntry] = Field(
..., description="List of transcript entries with alternating speakers"
)
@dataclass
class State:
"""Defines the input state for the agent, representing a narrower interface to the outside world.
This class is used to define the initial state and structure of incoming data.
See: https://langchain-ai.github.io/langgraph/concepts/low_level/#state
for more information.
"""
# Runtime context
db_session: AsyncSession
source_content: str
podcast_transcript: list[PodcastTranscriptEntry] | None = None
final_podcast_file_path: str | None = None

View file

@ -1,84 +0,0 @@
def get_voice_for_provider(provider: str, speaker_id: int) -> dict | str:
"""
Get the appropriate voice configuration based on the TTS provider and speaker ID.
Args:
provider: The TTS provider (e.g., "openai/tts-1", "vertex_ai/test")
speaker_id: The ID of the speaker (0-5)
Returns:
Voice configuration - string for OpenAI, dict for Vertex AI
"""
if provider == "local/kokoro":
# Kokoro voice mapping - https://huggingface.co/hexgrad/Kokoro-82M/tree/main/voices
kokoro_voices = {
0: "am_adam", # Default/intro voice
1: "af_bella", # First speaker
}
return kokoro_voices.get(speaker_id, "af_heart")
# Extract provider type from the model string
provider_type = (
provider.split("/")[0].lower() if "/" in provider else provider.lower()
)
if provider_type == "openai":
# OpenAI voice mapping - simple string values
openai_voices = {
0: "alloy", # Default/intro voice
1: "echo", # First speaker
2: "fable", # Second speaker
3: "onyx", # Third speaker
4: "nova", # Fourth speaker
5: "shimmer", # Fifth speaker
}
return openai_voices.get(speaker_id, "alloy")
elif provider_type == "vertex_ai":
# Vertex AI voice mapping - dict with languageCode and name
vertex_voices = {
0: {
"languageCode": "en-US",
"name": "en-US-Studio-O",
},
1: {
"languageCode": "en-US",
"name": "en-US-Studio-M",
},
2: {
"languageCode": "en-UK",
"name": "en-UK-Studio-A",
},
3: {
"languageCode": "en-UK",
"name": "en-UK-Studio-B",
},
4: {
"languageCode": "en-AU",
"name": "en-AU-Studio-A",
},
5: {
"languageCode": "en-AU",
"name": "en-AU-Studio-B",
},
}
return vertex_voices.get(speaker_id, vertex_voices[0])
elif provider_type == "azure":
# OpenAI voice mapping - simple string values
azure_voices = {
0: "alloy", # Default/intro voice
1: "echo", # First speaker
2: "fable", # Second speaker
3: "onyx", # Third speaker
4: "nova", # Fourth speaker
5: "shimmer", # Fifth speaker
}
return azure_voices.get(speaker_id, "alloy")
else:
# Default fallback to OpenAI format for unknown providers
default_voices = {
0: {},
1: {},
}
return default_voices.get(speaker_id, default_voices[0])

View file

@ -1,8 +1,7 @@
"""Video Presentation LangGraph Agent.
This module defines a graph for generating video presentations
from source content, similar to the podcaster agent but producing
slide-based video presentations with TTS narration.
This module defines a graph for generating slide-based video presentations
from source content, with TTS narration per slide.
"""
from .graph import graph

View file

@ -181,7 +181,6 @@ celery_app = Celery(
backend=CELERY_RESULT_BACKEND,
include=[
"app.tasks.celery_tasks.document_tasks",
"app.tasks.celery_tasks.podcast_tasks",
"app.podcasts.tasks.draft",
"app.podcasts.tasks.render",
"app.tasks.celery_tasks.video_presentation_tasks",

View file

@ -68,7 +68,6 @@ from .new_llm_config import (
NewLLMConfigRead,
NewLLMConfigUpdate,
)
from .podcasts import PodcastBase, PodcastCreate, PodcastRead, PodcastUpdate
from .rbac_schemas import (
InviteAcceptRequest,
InviteAcceptResponse,
@ -232,10 +231,6 @@ __all__ = [
"PermissionInfo",
"PermissionsListResponse",
# Podcast schemas
"PodcastBase",
"PodcastCreate",
"PodcastRead",
"PodcastUpdate",
"RefreshTokenRequest",
"RefreshTokenResponse",
# Report schemas

View file

@ -1,66 +0,0 @@
"""Podcast schemas for API responses."""
from datetime import datetime
from enum import StrEnum
from typing import Any
from pydantic import BaseModel
class PodcastStatusEnum(StrEnum):
PENDING = "pending"
GENERATING = "generating"
READY = "ready"
FAILED = "failed"
class PodcastBase(BaseModel):
"""Base podcast schema."""
title: str
podcast_transcript: list[dict[str, Any]] | None = None
file_location: str | None = None
search_space_id: int
class PodcastCreate(PodcastBase):
"""Schema for creating a podcast."""
pass
class PodcastUpdate(BaseModel):
"""Schema for updating a podcast."""
title: str | None = None
podcast_transcript: list[dict[str, Any]] | None = None
file_location: str | None = None
class PodcastRead(PodcastBase):
"""Schema for reading a podcast."""
id: int
status: PodcastStatusEnum = PodcastStatusEnum.READY
created_at: datetime
transcript_entries: int | None = None
class Config:
from_attributes = True
@classmethod
def from_orm_with_entries(cls, obj):
"""Create PodcastRead with transcript_entries computed."""
data = {
"id": obj.id,
"title": obj.title,
"podcast_transcript": obj.podcast_transcript,
"file_location": obj.file_location,
"search_space_id": obj.search_space_id,
"status": obj.status,
"created_at": obj.created_at,
"transcript_entries": len(obj.podcast_transcript)
if obj.podcast_transcript
else None,
}
return cls(**data)

View file

@ -1,236 +0,0 @@
"""Celery tasks for podcast generation."""
import asyncio
import logging
import sys
from contextlib import asynccontextmanager
from sqlalchemy import select
from app.agents.podcaster.graph import graph as podcaster_graph
from app.agents.podcaster.state import State as PodcasterState
from app.celery_app import celery_app
from app.config import config as app_config
from app.db import Podcast, PodcastStatus
from app.services.billable_calls import (
BillingSettlementError,
QuotaInsufficientError,
_resolve_agent_billing_for_search_space,
billable_call,
)
from app.tasks.celery_tasks import get_celery_session_maker, run_async_celery_task
logger = logging.getLogger(__name__)
if sys.platform.startswith("win"):
try:
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
except AttributeError:
logger.warning(
"WindowsProactorEventLoopPolicy is unavailable; async subprocess support may fail."
)
# =============================================================================
# Content-based podcast generation (for new-chat)
# =============================================================================
@asynccontextmanager
async def _celery_billable_session():
"""Session factory used by billable_call inside the Celery worker loop."""
async with get_celery_session_maker()() as session:
yield session
@celery_app.task(name="generate_content_podcast", bind=True)
def generate_content_podcast_task(
self,
podcast_id: int,
source_content: str,
search_space_id: int,
user_prompt: str | None = None,
) -> dict:
"""
Celery task to generate podcast from source content.
Updates existing podcast record created by the tool.
"""
try:
return run_async_celery_task(
lambda: _generate_content_podcast(
podcast_id,
source_content,
search_space_id,
user_prompt,
)
)
except Exception as e:
logger.error(f"Error generating content podcast: {e!s}")
try:
run_async_celery_task(lambda: _mark_podcast_failed(podcast_id))
except Exception:
logger.exception("Failed to mark podcast %s as failed", podcast_id)
return {"status": "failed", "podcast_id": podcast_id}
async def _mark_podcast_failed(podcast_id: int) -> None:
"""Mark a podcast as failed in the database."""
async with get_celery_session_maker()() as session:
try:
result = await session.execute(
select(Podcast).filter(Podcast.id == podcast_id)
)
podcast = result.scalars().first()
if podcast:
podcast.status = PodcastStatus.FAILED
await session.commit()
except Exception as e:
logger.error(f"Failed to mark podcast as failed: {e}")
async def _generate_content_podcast(
podcast_id: int,
source_content: str,
search_space_id: int,
user_prompt: str | None = None,
) -> dict:
"""Generate content-based podcast and update existing record."""
async with get_celery_session_maker()() as session:
result = await session.execute(select(Podcast).filter(Podcast.id == podcast_id))
podcast = result.scalars().first()
if not podcast:
raise ValueError(f"Podcast {podcast_id} not found")
try:
podcast.status = PodcastStatus.GENERATING
await session.commit()
try:
(
owner_user_id,
billing_tier,
base_model,
) = await _resolve_agent_billing_for_search_space(
session,
search_space_id,
thread_id=podcast.thread_id,
)
except ValueError as resolve_err:
logger.error(
"Podcast %s: cannot resolve billing for search_space=%s: %s",
podcast.id,
search_space_id,
resolve_err,
)
podcast.status = PodcastStatus.FAILED
await session.commit()
return {
"status": "failed",
"podcast_id": podcast.id,
"reason": "billing_resolution_failed",
}
graph_config = {
"configurable": {
"podcast_title": podcast.title,
"search_space_id": search_space_id,
"user_prompt": user_prompt,
}
}
initial_state = PodcasterState(
source_content=source_content,
db_session=session,
)
try:
async with billable_call(
user_id=owner_user_id,
search_space_id=search_space_id,
billing_tier=billing_tier,
base_model=base_model,
quota_reserve_micros_override=app_config.QUOTA_DEFAULT_PODCAST_RESERVE_MICROS,
usage_type="podcast_generation",
call_details={
"podcast_id": podcast.id,
"title": podcast.title,
"thread_id": podcast.thread_id,
},
billable_session_factory=_celery_billable_session,
):
graph_result = await podcaster_graph.ainvoke(
initial_state, config=graph_config
)
except QuotaInsufficientError as exc:
logger.info(
"Podcast %s denied: out of premium credits "
"(used=%d/%d remaining=%d)",
podcast.id,
exc.used_micros,
exc.limit_micros,
exc.remaining_micros,
)
podcast.status = PodcastStatus.FAILED
await session.commit()
return {
"status": "failed",
"podcast_id": podcast.id,
"reason": "premium_quota_exhausted",
}
except BillingSettlementError:
logger.exception(
"Podcast %s: premium billing settlement failed",
podcast.id,
)
podcast.status = PodcastStatus.FAILED
await session.commit()
return {
"status": "failed",
"podcast_id": podcast.id,
"reason": "billing_settlement_failed",
}
podcast_transcript = graph_result.get("podcast_transcript", [])
file_path = graph_result.get("final_podcast_file_path", "")
serializable_transcript = []
for entry in podcast_transcript:
if hasattr(entry, "speaker_id"):
serializable_transcript.append(
{"speaker_id": entry.speaker_id, "dialog": entry.dialog}
)
else:
serializable_transcript.append(
{
"speaker_id": entry.get("speaker_id", 0),
"dialog": entry.get("dialog", ""),
}
)
podcast.podcast_transcript = serializable_transcript
podcast.file_location = file_path
podcast.status = PodcastStatus.READY
logger.info(
"Podcast %s: committing READY transcript_entries=%d file=%s",
podcast.id,
len(serializable_transcript),
file_path,
)
await session.commit()
logger.info("Podcast %s: READY commit complete", podcast.id)
logger.info(f"Successfully generated podcast: {podcast.id}")
return {
"status": "ready",
"podcast_id": podcast.id,
"title": podcast.title,
"transcript_entries": len(serializable_transcript),
}
except Exception as e:
logger.error(f"Error in _generate_content_podcast: {e!s}")
podcast.status = PodcastStatus.FAILED
await session.commit()
raise