diff --git a/surfsense_backend/app/podcasts/tasks/__init__.py b/surfsense_backend/app/podcasts/tasks/__init__.py new file mode 100644 index 000000000..32f7c3a72 --- /dev/null +++ b/surfsense_backend/app/podcasts/tasks/__init__.py @@ -0,0 +1,18 @@ +"""Celery tasks driving the podcast lifecycle across its user gates. + +One task per async phase: propose the brief, draft the transcript, render the +audio. Each is enqueued by the API after it performs the guarded status +transition, and each pushes its result onto the row for the frontend to observe. +""" + +from __future__ import annotations + +from .brief import propose_brief_task +from .draft import draft_transcript_task +from .render import render_audio_task + +__all__ = [ + "draft_transcript_task", + "propose_brief_task", + "render_audio_task", +] diff --git a/surfsense_backend/app/podcasts/tasks/brief.py b/surfsense_backend/app/podcasts/tasks/brief.py new file mode 100644 index 000000000..2f1e3f240 --- /dev/null +++ b/surfsense_backend/app/podcasts/tasks/brief.py @@ -0,0 +1,60 @@ +"""Brief-proposal task: PENDING -> AWAITING_BRIEF. + +Runs the (cheap, token-light) brief graph to detect language and propose a spec, +seeded with the user's last-used language/voice preferences. Pushes the result +straight onto the row so the frontend sees the brief gate open via Zero. +""" + +from __future__ import annotations + +import logging + +from app.celery_app import celery_app +from app.podcasts.generation.brief.graph import graph as brief_graph +from app.podcasts.generation.brief.state import BriefState +from app.podcasts.persistence import PodcastRepository +from app.podcasts.service import PodcastService, preferences_from +from app.tasks.celery_tasks import get_celery_session_maker, run_async_celery_task + +from .runtime import mark_failed + +logger = logging.getLogger(__name__) + + +@celery_app.task(name="podcast.propose_brief", bind=True) +def propose_brief_task(self, podcast_id: int, search_space_id: int) -> dict: + try: + return run_async_celery_task( + lambda: _propose_brief(podcast_id, search_space_id) + ) + except Exception as exc: # noqa: BLE001 - record and report, never crash worker + logger.error("Podcast %s brief proposal failed: %s", podcast_id, exc) + run_async_celery_task(lambda: mark_failed(podcast_id, str(exc))) + return {"status": "failed", "podcast_id": podcast_id} + + +async def _propose_brief(podcast_id: int, search_space_id: int) -> dict: + async with get_celery_session_maker()() as session: + repo = PodcastRepository(session) + podcast = await repo.get(podcast_id) + if podcast is None: + raise ValueError(f"podcast {podcast_id} not found") + + last_language, last_voices = preferences_from( + await repo.latest_with_spec(search_space_id) + ) + state = BriefState( + db_session=session, source_content=podcast.source_content or "" + ) + config = { + "configurable": { + "search_space_id": search_space_id, + "last_used_language": last_language, + "last_used_voices": last_voices, + } + } + result = await brief_graph.ainvoke(state, config=config) + + await PodcastService(session).attach_brief(podcast, result["spec"]) + await session.commit() + return {"status": "awaiting_brief", "podcast_id": podcast_id} diff --git a/surfsense_backend/app/podcasts/tasks/draft.py b/surfsense_backend/app/podcasts/tasks/draft.py new file mode 100644 index 000000000..8d461bf9b --- /dev/null +++ b/surfsense_backend/app/podcasts/tasks/draft.py @@ -0,0 +1,94 @@ +"""Transcript-drafting task: DRAFTING -> AWAITING_REVIEW. + +The expensive, LLM-heavy step, so it runs under ``billable_call`` exactly like +the legacy generator. The API has already moved the row to DRAFTING and stored +the approved brief; this task drafts the long-form transcript and opens the +go/no-go gate. +""" + +from __future__ import annotations + +import logging + +from app.celery_app import celery_app +from app.config import config as app_config +from app.podcasts.generation.transcript.graph import graph as transcript_graph +from app.podcasts.generation.transcript.state import TranscriptState +from app.podcasts.persistence import PodcastRepository +from app.podcasts.service import PodcastService, read_spec +from app.services.billable_calls import ( + BillingSettlementError, + QuotaInsufficientError, + _resolve_agent_billing_for_search_space, + billable_call, +) +from app.tasks.celery_tasks import get_celery_session_maker, run_async_celery_task + +from .runtime import billable_session, mark_failed + +logger = logging.getLogger(__name__) + + +@celery_app.task(name="podcast.draft_transcript", bind=True) +def draft_transcript_task(self, podcast_id: int, search_space_id: int) -> dict: + try: + return run_async_celery_task( + lambda: _draft_transcript(podcast_id, search_space_id) + ) + except Exception as exc: # noqa: BLE001 - record and report, never crash worker + logger.error("Podcast %s drafting failed: %s", podcast_id, exc) + run_async_celery_task(lambda: mark_failed(podcast_id, str(exc))) + return {"status": "failed", "podcast_id": podcast_id} + + +async def _draft_transcript(podcast_id: int, search_space_id: int) -> dict: + async with get_celery_session_maker()() as session: + repo = PodcastRepository(session) + service = PodcastService(session) + podcast = await repo.get(podcast_id) + if podcast is None: + raise ValueError(f"podcast {podcast_id} not found") + + spec = read_spec(podcast) + if spec is None: + raise ValueError(f"podcast {podcast_id} has no approved brief") + + owner_id, tier, base_model = await _resolve_agent_billing_for_search_space( + session, search_space_id, thread_id=podcast.thread_id + ) + + state = TranscriptState( + db_session=session, source_content=podcast.source_content or "" + ) + config = { + "configurable": { + "search_space_id": search_space_id, + "spec": spec, + "focus": spec.focus, + } + } + + try: + async with billable_call( + user_id=owner_id, + search_space_id=search_space_id, + billing_tier=tier, + base_model=base_model, + quota_reserve_micros_override=app_config.QUOTA_DEFAULT_PODCAST_RESERVE_MICROS, + usage_type="podcast_generation", + call_details={"podcast_id": podcast_id, "title": podcast.title}, + billable_session_factory=billable_session, + ): + result = await transcript_graph.ainvoke(state, config=config) + except QuotaInsufficientError: + await service.fail(podcast, "premium quota exhausted") + await session.commit() + return {"status": "failed", "podcast_id": podcast_id, "reason": "quota"} + except BillingSettlementError: + await service.fail(podcast, "billing settlement failed") + await session.commit() + return {"status": "failed", "podcast_id": podcast_id, "reason": "billing"} + + await service.attach_transcript(podcast, result["transcript"]) + await session.commit() + return {"status": "awaiting_review", "podcast_id": podcast_id} diff --git a/surfsense_backend/app/podcasts/tasks/render.py b/surfsense_backend/app/podcasts/tasks/render.py new file mode 100644 index 000000000..04fb9ab9d --- /dev/null +++ b/surfsense_backend/app/podcasts/tasks/render.py @@ -0,0 +1,70 @@ +"""Audio-rendering task: RENDERING -> READY. + +Synthesises and merges the approved transcript, stores the MP3 in the object +store, and marks the podcast ready. The working directory is stable per podcast +so a re-render (e.g. after a voice change) reuses the segment cache. +""" + +from __future__ import annotations + +import logging +import tempfile +from pathlib import Path + +from app.celery_app import celery_app +from app.podcasts.persistence import PodcastRepository +from app.podcasts.rendering import PodcastRenderer +from app.podcasts.service import PodcastService, read_spec, read_transcript +from app.podcasts.storage import store_audio +from app.podcasts.tts import get_text_to_speech +from app.podcasts.voices import get_voice_catalog +from app.tasks.celery_tasks import get_celery_session_maker, run_async_celery_task + +from .runtime import mark_failed + +logger = logging.getLogger(__name__) + +_WORKDIR_BASE = Path(tempfile.gettempdir()) / "surfsense_podcasts" + + +@celery_app.task(name="podcast.render_audio", bind=True) +def render_audio_task(self, podcast_id: int) -> dict: + try: + return run_async_celery_task(lambda: _render_audio(podcast_id)) + except Exception as exc: # noqa: BLE001 - record and report, never crash worker + logger.error("Podcast %s render failed: %s", podcast_id, exc) + run_async_celery_task(lambda: mark_failed(podcast_id, str(exc))) + return {"status": "failed", "podcast_id": podcast_id} + + +async def _render_audio(podcast_id: int) -> dict: + async with get_celery_session_maker()() as session: + repo = PodcastRepository(session) + podcast = await repo.get(podcast_id) + if podcast is None: + raise ValueError(f"podcast {podcast_id} not found") + + spec = read_spec(podcast) + transcript = read_transcript(podcast) + if spec is None or transcript is None: + raise ValueError(f"podcast {podcast_id} is missing brief or transcript") + + renderer = PodcastRenderer( + tts=get_text_to_speech(), catalog=get_voice_catalog() + ) + workdir = _WORKDIR_BASE / str(podcast_id) + workdir.mkdir(parents=True, exist_ok=True) + rendered = await renderer.render( + spec=spec, transcript=transcript, workdir=workdir + ) + + backend_name, key = await store_audio( + search_space_id=podcast.search_space_id, + podcast_id=podcast_id, + data=rendered.data, + ) + await PodcastService(session).attach_audio( + podcast, storage_backend=backend_name, storage_key=key + ) + await session.commit() + return {"status": "ready", "podcast_id": podcast_id} diff --git a/surfsense_backend/app/podcasts/tasks/runtime.py b/surfsense_backend/app/podcasts/tasks/runtime.py new file mode 100644 index 000000000..349aeffb2 --- /dev/null +++ b/surfsense_backend/app/podcasts/tasks/runtime.py @@ -0,0 +1,40 @@ +"""Shared plumbing for the podcast Celery tasks. + +Each task runs its async body via :func:`run_async_celery_task` and, on any +failure, records the reason on the row through the lifecycle service. Marking +failed is best-effort: a podcast that already reached a terminal state is left +untouched rather than forced. +""" + +from __future__ import annotations + +import logging +from contextlib import asynccontextmanager + +from app.podcasts.persistence import PodcastRepository +from app.podcasts.service import PodcastError, PodcastService +from app.tasks.celery_tasks import get_celery_session_maker + +logger = logging.getLogger(__name__) + + +@asynccontextmanager +async def billable_session(): + """Session factory for ``billable_call`` inside the worker loop.""" + async with get_celery_session_maker()() as session: + yield session + + +async def mark_failed(podcast_id: int, error: str) -> None: + """Best-effort: move a non-terminal podcast to FAILED with ``error``.""" + async with get_celery_session_maker()() as session: + repo = PodcastRepository(session) + podcast = await repo.get(podcast_id) + if podcast is None: + return + try: + await PodcastService(session).fail(podcast, error) + await session.commit() + except PodcastError: + # Already terminal (e.g. cancelled): nothing to record. + logger.info("Podcast %s already terminal; not marking failed", podcast_id)