feat(podcasts): add celery tasks

This commit is contained in:
CREDO23 2026-06-10 18:44:03 +02:00
parent 4271048dcf
commit b2970ba37e
5 changed files with 282 additions and 0 deletions

View file

@ -0,0 +1,18 @@
"""Celery tasks driving the podcast lifecycle across its user gates.
One task per async phase: propose the brief, draft the transcript, render the
audio. Each is enqueued by the API after it performs the guarded status
transition, and each pushes its result onto the row for the frontend to observe.
"""
from __future__ import annotations
from .brief import propose_brief_task
from .draft import draft_transcript_task
from .render import render_audio_task
__all__ = [
"draft_transcript_task",
"propose_brief_task",
"render_audio_task",
]

View file

@ -0,0 +1,60 @@
"""Brief-proposal task: PENDING -> AWAITING_BRIEF.
Runs the (cheap, token-light) brief graph to detect language and propose a spec,
seeded with the user's last-used language/voice preferences. Pushes the result
straight onto the row so the frontend sees the brief gate open via Zero.
"""
from __future__ import annotations
import logging
from app.celery_app import celery_app
from app.podcasts.generation.brief.graph import graph as brief_graph
from app.podcasts.generation.brief.state import BriefState
from app.podcasts.persistence import PodcastRepository
from app.podcasts.service import PodcastService, preferences_from
from app.tasks.celery_tasks import get_celery_session_maker, run_async_celery_task
from .runtime import mark_failed
logger = logging.getLogger(__name__)
@celery_app.task(name="podcast.propose_brief", bind=True)
def propose_brief_task(self, podcast_id: int, search_space_id: int) -> dict:
try:
return run_async_celery_task(
lambda: _propose_brief(podcast_id, search_space_id)
)
except Exception as exc: # noqa: BLE001 - record and report, never crash worker
logger.error("Podcast %s brief proposal failed: %s", podcast_id, exc)
run_async_celery_task(lambda: mark_failed(podcast_id, str(exc)))
return {"status": "failed", "podcast_id": podcast_id}
async def _propose_brief(podcast_id: int, search_space_id: int) -> dict:
async with get_celery_session_maker()() as session:
repo = PodcastRepository(session)
podcast = await repo.get(podcast_id)
if podcast is None:
raise ValueError(f"podcast {podcast_id} not found")
last_language, last_voices = preferences_from(
await repo.latest_with_spec(search_space_id)
)
state = BriefState(
db_session=session, source_content=podcast.source_content or ""
)
config = {
"configurable": {
"search_space_id": search_space_id,
"last_used_language": last_language,
"last_used_voices": last_voices,
}
}
result = await brief_graph.ainvoke(state, config=config)
await PodcastService(session).attach_brief(podcast, result["spec"])
await session.commit()
return {"status": "awaiting_brief", "podcast_id": podcast_id}

View file

@ -0,0 +1,94 @@
"""Transcript-drafting task: DRAFTING -> AWAITING_REVIEW.
The expensive, LLM-heavy step, so it runs under ``billable_call`` exactly like
the legacy generator. The API has already moved the row to DRAFTING and stored
the approved brief; this task drafts the long-form transcript and opens the
go/no-go gate.
"""
from __future__ import annotations
import logging
from app.celery_app import celery_app
from app.config import config as app_config
from app.podcasts.generation.transcript.graph import graph as transcript_graph
from app.podcasts.generation.transcript.state import TranscriptState
from app.podcasts.persistence import PodcastRepository
from app.podcasts.service import PodcastService, read_spec
from app.services.billable_calls import (
BillingSettlementError,
QuotaInsufficientError,
_resolve_agent_billing_for_search_space,
billable_call,
)
from app.tasks.celery_tasks import get_celery_session_maker, run_async_celery_task
from .runtime import billable_session, mark_failed
logger = logging.getLogger(__name__)
@celery_app.task(name="podcast.draft_transcript", bind=True)
def draft_transcript_task(self, podcast_id: int, search_space_id: int) -> dict:
try:
return run_async_celery_task(
lambda: _draft_transcript(podcast_id, search_space_id)
)
except Exception as exc: # noqa: BLE001 - record and report, never crash worker
logger.error("Podcast %s drafting failed: %s", podcast_id, exc)
run_async_celery_task(lambda: mark_failed(podcast_id, str(exc)))
return {"status": "failed", "podcast_id": podcast_id}
async def _draft_transcript(podcast_id: int, search_space_id: int) -> dict:
async with get_celery_session_maker()() as session:
repo = PodcastRepository(session)
service = PodcastService(session)
podcast = await repo.get(podcast_id)
if podcast is None:
raise ValueError(f"podcast {podcast_id} not found")
spec = read_spec(podcast)
if spec is None:
raise ValueError(f"podcast {podcast_id} has no approved brief")
owner_id, tier, base_model = await _resolve_agent_billing_for_search_space(
session, search_space_id, thread_id=podcast.thread_id
)
state = TranscriptState(
db_session=session, source_content=podcast.source_content or ""
)
config = {
"configurable": {
"search_space_id": search_space_id,
"spec": spec,
"focus": spec.focus,
}
}
try:
async with billable_call(
user_id=owner_id,
search_space_id=search_space_id,
billing_tier=tier,
base_model=base_model,
quota_reserve_micros_override=app_config.QUOTA_DEFAULT_PODCAST_RESERVE_MICROS,
usage_type="podcast_generation",
call_details={"podcast_id": podcast_id, "title": podcast.title},
billable_session_factory=billable_session,
):
result = await transcript_graph.ainvoke(state, config=config)
except QuotaInsufficientError:
await service.fail(podcast, "premium quota exhausted")
await session.commit()
return {"status": "failed", "podcast_id": podcast_id, "reason": "quota"}
except BillingSettlementError:
await service.fail(podcast, "billing settlement failed")
await session.commit()
return {"status": "failed", "podcast_id": podcast_id, "reason": "billing"}
await service.attach_transcript(podcast, result["transcript"])
await session.commit()
return {"status": "awaiting_review", "podcast_id": podcast_id}

View file

@ -0,0 +1,70 @@
"""Audio-rendering task: RENDERING -> READY.
Synthesises and merges the approved transcript, stores the MP3 in the object
store, and marks the podcast ready. The working directory is stable per podcast
so a re-render (e.g. after a voice change) reuses the segment cache.
"""
from __future__ import annotations
import logging
import tempfile
from pathlib import Path
from app.celery_app import celery_app
from app.podcasts.persistence import PodcastRepository
from app.podcasts.rendering import PodcastRenderer
from app.podcasts.service import PodcastService, read_spec, read_transcript
from app.podcasts.storage import store_audio
from app.podcasts.tts import get_text_to_speech
from app.podcasts.voices import get_voice_catalog
from app.tasks.celery_tasks import get_celery_session_maker, run_async_celery_task
from .runtime import mark_failed
logger = logging.getLogger(__name__)
_WORKDIR_BASE = Path(tempfile.gettempdir()) / "surfsense_podcasts"
@celery_app.task(name="podcast.render_audio", bind=True)
def render_audio_task(self, podcast_id: int) -> dict:
try:
return run_async_celery_task(lambda: _render_audio(podcast_id))
except Exception as exc: # noqa: BLE001 - record and report, never crash worker
logger.error("Podcast %s render failed: %s", podcast_id, exc)
run_async_celery_task(lambda: mark_failed(podcast_id, str(exc)))
return {"status": "failed", "podcast_id": podcast_id}
async def _render_audio(podcast_id: int) -> dict:
async with get_celery_session_maker()() as session:
repo = PodcastRepository(session)
podcast = await repo.get(podcast_id)
if podcast is None:
raise ValueError(f"podcast {podcast_id} not found")
spec = read_spec(podcast)
transcript = read_transcript(podcast)
if spec is None or transcript is None:
raise ValueError(f"podcast {podcast_id} is missing brief or transcript")
renderer = PodcastRenderer(
tts=get_text_to_speech(), catalog=get_voice_catalog()
)
workdir = _WORKDIR_BASE / str(podcast_id)
workdir.mkdir(parents=True, exist_ok=True)
rendered = await renderer.render(
spec=spec, transcript=transcript, workdir=workdir
)
backend_name, key = await store_audio(
search_space_id=podcast.search_space_id,
podcast_id=podcast_id,
data=rendered.data,
)
await PodcastService(session).attach_audio(
podcast, storage_backend=backend_name, storage_key=key
)
await session.commit()
return {"status": "ready", "podcast_id": podcast_id}

View file

@ -0,0 +1,40 @@
"""Shared plumbing for the podcast Celery tasks.
Each task runs its async body via :func:`run_async_celery_task` and, on any
failure, records the reason on the row through the lifecycle service. Marking
failed is best-effort: a podcast that already reached a terminal state is left
untouched rather than forced.
"""
from __future__ import annotations
import logging
from contextlib import asynccontextmanager
from app.podcasts.persistence import PodcastRepository
from app.podcasts.service import PodcastError, PodcastService
from app.tasks.celery_tasks import get_celery_session_maker
logger = logging.getLogger(__name__)
@asynccontextmanager
async def billable_session():
"""Session factory for ``billable_call`` inside the worker loop."""
async with get_celery_session_maker()() as session:
yield session
async def mark_failed(podcast_id: int, error: str) -> None:
"""Best-effort: move a non-terminal podcast to FAILED with ``error``."""
async with get_celery_session_maker()() as session:
repo = PodcastRepository(session)
podcast = await repo.get(podcast_id)
if podcast is None:
return
try:
await PodcastService(session).fail(podcast, error)
await session.commit()
except PodcastError:
# Already terminal (e.g. cancelled): nothing to record.
logger.info("Podcast %s already terminal; not marking failed", podcast_id)