refactor(podcasts): propose brief inline at create

This commit is contained in:
CREDO23 2026-06-10 20:51:51 +02:00
parent aa7aa81c16
commit bae59140a6
6 changed files with 61 additions and 73 deletions

View file

@ -182,7 +182,6 @@ celery_app = Celery(
include=[
"app.tasks.celery_tasks.document_tasks",
"app.tasks.celery_tasks.podcast_tasks",
"app.podcasts.tasks.brief",
"app.podcasts.tasks.draft",
"app.podcasts.tasks.render",
"app.tasks.celery_tasks.video_presentation_tasks",

View file

@ -16,6 +16,7 @@ from fastapi.responses import StreamingResponse
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import config as app_config
from app.db import (
Permission,
SearchSpace,
@ -23,6 +24,7 @@ from app.db import (
User,
get_async_session,
)
from app.podcasts.generation.brief import propose_brief
from app.podcasts.persistence import Podcast, PodcastRepository
from app.podcasts.service import (
InvalidTransition,
@ -33,11 +35,9 @@ from app.podcasts.service import (
from app.podcasts.storage import open_audio_stream, purge_audio
from app.podcasts.tasks import (
draft_transcript_task,
propose_brief_task,
render_audio_task,
)
from app.podcasts.voices import get_voice_catalog, provider_from_service
from app.config import config as app_config
from app.users import current_active_user
from app.utils.rbac import check_permission
@ -118,15 +118,24 @@ async def create_podcast(
):
await _require(session, user, body.search_space_id, Permission.PODCASTS_CREATE)
podcast = await PodcastService(session).create(
service = PodcastService(session)
podcast = await service.create(
title=body.title,
search_space_id=body.search_space_id,
thread_id=body.thread_id,
)
podcast.source_content = body.source_content
await session.commit()
propose_brief_task.delay(podcast.id, body.search_space_id)
spec = await propose_brief(
session,
search_space_id=body.search_space_id,
speaker_count=body.speaker_count,
min_minutes=body.min_minutes,
max_minutes=body.max_minutes,
focus=body.focus,
)
await service.attach_brief(podcast, spec)
await session.commit()
return PodcastDetail.of(podcast)

View file

@ -1,9 +1,10 @@
"""Brief planning: propose a reviewable spec from weak signals."""
"""Brief planning: propose a reviewable spec from last-used preferences."""
from __future__ import annotations
from .config import BriefConfig
from .graph import build_brief_graph
from .propose import propose_brief
from .state import BriefState
__all__ = ["BriefConfig", "BriefState", "build_brief_graph"]
__all__ = ["BriefConfig", "BriefState", "build_brief_graph", "propose_brief"]

View file

@ -0,0 +1,40 @@
"""Propose a podcast's initial brief spec."""
from __future__ import annotations
from sqlalchemy.ext.asyncio import AsyncSession
from app.podcasts.persistence import PodcastRepository
from app.podcasts.schemas import PodcastSpec
from app.podcasts.service import preferences_from
from .config import DEFAULT_MAX_MINUTES, DEFAULT_MIN_MINUTES, DEFAULT_SPEAKER_COUNT
from .graph import graph as brief_graph
from .state import BriefState
async def propose_brief(
session: AsyncSession,
*,
search_space_id: int,
speaker_count: int = DEFAULT_SPEAKER_COUNT,
min_minutes: int = DEFAULT_MIN_MINUTES,
max_minutes: int = DEFAULT_MAX_MINUTES,
focus: str | None = None,
) -> PodcastSpec:
"""Reuse the last-used language and voices, else English; return the spec."""
last_language, last_voices = preferences_from(
await PodcastRepository(session).latest_with_spec(search_space_id)
)
config = {
"configurable": {
"speaker_count": speaker_count,
"min_minutes": min_minutes,
"max_minutes": max_minutes,
"focus": focus,
"last_used_language": last_language,
"last_used_voices": last_voices,
}
}
result = await brief_graph.ainvoke(BriefState(), config=config)
return result["spec"]

View file

@ -1,18 +1,17 @@
"""Celery tasks driving the podcast lifecycle across its user gates.
"""Celery tasks driving the podcast lifecycle across its expensive phases.
One task per async phase: propose the brief, draft the transcript, render the
audio. Each is enqueued by the API after it performs the guarded status
One task per heavy async phase: draft the transcript (LLM) and render the audio
(TTS). The brief is deterministic and proposed inline at create time, so it has
no task. Each task is enqueued by the API after it performs the guarded status
transition, and each pushes its result onto the row for the frontend to observe.
"""
from __future__ import annotations
from .brief import propose_brief_task
from .draft import draft_transcript_task
from .render import render_audio_task
__all__ = [
"draft_transcript_task",
"propose_brief_task",
"render_audio_task",
]

View file

@ -1,60 +0,0 @@
"""Brief-proposal task: PENDING -> AWAITING_BRIEF.
Runs the (cheap, token-light) brief graph to detect language and propose a spec,
seeded with the user's last-used language/voice preferences. Pushes the result
straight onto the row so the frontend sees the brief gate open via Zero.
"""
from __future__ import annotations
import logging
from app.celery_app import celery_app
from app.podcasts.generation.brief.graph import graph as brief_graph
from app.podcasts.generation.brief.state import BriefState
from app.podcasts.persistence import PodcastRepository
from app.podcasts.service import PodcastService, preferences_from
from app.tasks.celery_tasks import get_celery_session_maker, run_async_celery_task
from .runtime import mark_failed
logger = logging.getLogger(__name__)
@celery_app.task(name="podcast.propose_brief", bind=True)
def propose_brief_task(self, podcast_id: int, search_space_id: int) -> dict:
try:
return run_async_celery_task(
lambda: _propose_brief(podcast_id, search_space_id)
)
except Exception as exc: # noqa: BLE001 - record and report, never crash worker
logger.error("Podcast %s brief proposal failed: %s", podcast_id, exc)
run_async_celery_task(lambda: mark_failed(podcast_id, str(exc)))
return {"status": "failed", "podcast_id": podcast_id}
async def _propose_brief(podcast_id: int, search_space_id: int) -> dict:
async with get_celery_session_maker()() as session:
repo = PodcastRepository(session)
podcast = await repo.get(podcast_id)
if podcast is None:
raise ValueError(f"podcast {podcast_id} not found")
last_language, last_voices = preferences_from(
await repo.latest_with_spec(search_space_id)
)
state = BriefState(
db_session=session, source_content=podcast.source_content or ""
)
config = {
"configurable": {
"search_space_id": search_space_id,
"last_used_language": last_language,
"last_used_voices": last_voices,
}
}
result = await brief_graph.ainvoke(state, config=config)
await PodcastService(session).attach_brief(podcast, result["spec"])
await session.commit()
return {"status": "awaiting_brief", "podcast_id": podcast_id}