Merge remote-tracking branch 'upstream/dev' into feat/unified-model-connections

This commit is contained in:
Anish Sarkar 2026-06-13 19:04:49 +05:30
commit ab5423d2d2
45 changed files with 775 additions and 272 deletions

View file

@ -9,6 +9,8 @@ then enqueues the matching Celery task; lifecycle errors map to 409/422.
from __future__ import annotations
import os
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from pathlib import Path
from fastapi import APIRouter, Depends, HTTPException, Response
@ -27,10 +29,10 @@ from app.db import (
from app.podcasts.generation.brief import propose_brief
from app.podcasts.persistence import Podcast, PodcastRepository
from app.podcasts.service import (
InvalidTransition,
InvalidTransitionError,
PodcastService,
PreconditionFailed,
SpecConflict,
PreconditionFailedError,
SpecConflictError,
)
from app.podcasts.storage import open_audio_stream, purge_audio
from app.podcasts.tasks import draft_transcript_task
@ -45,6 +47,7 @@ from app.utils.rbac import check_permission
from .schemas import (
CreatePodcastRequest,
LanguageOptions,
PodcastDetail,
PodcastSummary,
UpdateSpecRequest,
@ -112,6 +115,20 @@ async def list_voices(language: str | None = None):
]
@router.get("/podcasts/languages", response_model=LanguageOptions)
async def list_languages():
"""Languages the active TTS provider can offer the brief editor."""
if not app_config.TTS_SERVICE:
raise HTTPException(status_code=503, detail="No TTS provider configured")
provider = provider_from_service(app_config.TTS_SERVICE)
offering = get_voice_catalog().offerable_languages(provider)
return LanguageOptions(
languages=offering.languages,
allows_custom=offering.allows_custom,
)
@router.get("/podcasts/voices/{voice_id}/preview")
async def preview_voice(
voice_id: str,
@ -324,19 +341,12 @@ async def _load(
return podcast
class _lifecycle_errors:
@asynccontextmanager
async def _lifecycle_errors() -> AsyncIterator[None]:
"""Map service lifecycle errors onto HTTP responses."""
async def __aenter__(self) -> None:
return None
async def __aexit__(self, exc_type, exc, tb) -> bool:
if exc is None:
return False
if isinstance(exc, SpecConflict):
raise HTTPException(status_code=409, detail=str(exc)) from exc
if isinstance(exc, InvalidTransition):
raise HTTPException(status_code=409, detail=str(exc)) from exc
if isinstance(exc, PreconditionFailed):
raise HTTPException(status_code=422, detail=str(exc)) from exc
return False
try:
yield
except (SpecConflictError, InvalidTransitionError) as exc:
raise HTTPException(status_code=409, detail=str(exc)) from exc
except PreconditionFailedError as exc:
raise HTTPException(status_code=422, detail=str(exc)) from exc

View file

@ -51,6 +51,17 @@ class VoiceOption(BaseModel):
gender: str
class LanguageOptions(BaseModel):
"""The languages the brief editor may offer for the active provider.
When ``allows_custom`` is true the list is a curated starting point and
the editor accepts any BCP-47 tag beyond it.
"""
languages: list[str]
allows_custom: bool
class PodcastSummary(BaseModel):
"""Lightweight list item."""

View file

@ -23,7 +23,9 @@ class StructuredOutputError(RuntimeError):
"""The model reply could not be parsed into the expected shape."""
async def invoke_json(llm, messages: list[BaseMessage], model: type[T]) -> T:
async def invoke_json[T: BaseModel](
llm, messages: list[BaseMessage], model: type[T]
) -> T:
"""Invoke ``llm`` and validate its reply as ``model``."""
response = await llm.ainvoke(messages)
content = strip_markdown_fences(extract_text_content(response.content))

View file

@ -18,7 +18,7 @@ from app.services.llm_service import get_agent_llm
from ..prompts import draft_segment_prompt, plan_outline_prompt
from ..structured import invoke_json
from .config import TranscriptConfig
from .planning import Outline, OutlineSegment, SegmentDraft
from .planning import Outline, SegmentDraft
from .state import TranscriptState
# Average speaking rate; converts target minutes to a target word count.

View file

@ -32,7 +32,7 @@ async def concat_to_mp3(segment_paths: list[Path], output_path: Path) -> None:
.output(str(output_path), {"c:a": "libmp3lame"})
)
await ffmpeg.execute()
except Exception as exc: # noqa: BLE001 - normalise ffmpeg failures
except Exception as exc:
raise RenderError(f"audio merge failed: {exc}") from exc
finally:
list_file.unlink(missing_ok=True)

View file

@ -77,9 +77,7 @@ class PodcastRenderer:
await concat_to_mp3(list(segment_paths), output_path)
return RenderedPodcast(data=output_path.read_bytes(), container="mp3")
def _request_for(
self, spec: PodcastSpec, turn: TranscriptTurn
) -> SynthesisRequest:
def _request_for(self, spec: PodcastSpec, turn: TranscriptTurn) -> SynthesisRequest:
try:
speaker = spec.speaker_for(turn.speaker)
except KeyError as exc:
@ -132,7 +130,7 @@ class _SegmentSynthesizer:
if owner:
try:
path = await self._synthesize(request, key)
except BaseException as exc: # noqa: BLE001 - relayed to all waiters
except BaseException as exc:
future.set_exception(exc)
else:
future.set_result(path)

View file

@ -70,7 +70,9 @@ class SpeakerSpec(BaseModel):
model_config = ConfigDict(extra="forbid")
slot: int = Field(..., ge=0, description="Stable index a transcript turn references")
slot: int = Field(
..., ge=0, description="Stable index a transcript turn references"
)
name: str = Field(..., min_length=1, max_length=120)
role: SpeakerRole
voice_id: str = Field(

View file

@ -59,11 +59,11 @@ class PodcastError(RuntimeError):
"""Base class for lifecycle errors."""
class InvalidTransition(PodcastError):
class InvalidTransitionError(PodcastError):
"""A requested status change is not permitted from the current state."""
class SpecConflict(PodcastError):
class SpecConflictError(PodcastError):
"""A spec edit raced another: the expected version is stale."""
def __init__(self, expected: int, actual: int) -> None:
@ -74,7 +74,7 @@ class SpecConflict(PodcastError):
self.actual = actual
class PreconditionFailed(PodcastError):
class PreconditionFailedError(PodcastError):
"""A transition's data precondition (brief/transcript present) is unmet."""
@ -110,12 +110,12 @@ class PodcastService:
) -> Podcast:
"""Edit the brief at the gate, guarded by optimistic concurrency."""
if _status(podcast) is not PodcastStatus.AWAITING_BRIEF:
raise InvalidTransition(
raise InvalidTransitionError(
f"the brief can only be edited while awaiting_brief, "
f"not {_status(podcast).value}"
)
if expected_version != podcast.spec_version:
raise SpecConflict(expected_version, podcast.spec_version)
raise SpecConflictError(expected_version, podcast.spec_version)
podcast.spec = spec.model_dump(mode="json")
podcast.spec_version += 1
await self._session.flush()
@ -124,7 +124,7 @@ class PodcastService:
async def begin_drafting(self, podcast: Podcast) -> Podcast:
"""Approve the brief and start transcript drafting."""
if podcast.spec is None:
raise PreconditionFailed("cannot draft without a brief")
raise PreconditionFailedError("cannot draft without a brief")
self._transition(podcast, PodcastStatus.DRAFTING)
await self._session.flush()
return podcast
@ -145,13 +145,13 @@ class PodcastService:
async def regenerate(self, podcast: Podcast) -> Podcast:
"""Reopen the brief gate; the saved spec becomes the new starting point."""
if _status(podcast) not in self._REGENERABLE:
raise InvalidTransition(
raise InvalidTransitionError(
f"nothing to regenerate from {_status(podcast).value}"
)
# Legacy episodes finished before briefs existed; a gate with nothing
# to review would strand them.
if podcast.spec is None:
raise PreconditionFailed("cannot regenerate without a brief")
raise PreconditionFailedError("cannot regenerate without a brief")
self._transition(podcast, PodcastStatus.AWAITING_BRIEF)
await self._session.flush()
return podcast
@ -164,7 +164,7 @@ class PodcastService:
has no regeneration to revert and is rejected.
"""
if not has_stored_episode(podcast):
raise InvalidTransition("no finished episode to fall back to")
raise InvalidTransitionError("no finished episode to fall back to")
self._transition(podcast, PodcastStatus.READY)
await self._session.flush()
return podcast
@ -200,7 +200,7 @@ class PodcastService:
backing out goes through revert_regeneration instead.
"""
if has_stored_episode(podcast):
raise InvalidTransition(
raise InvalidTransitionError(
"a finished episode exists; revert the regeneration instead"
)
self._transition(podcast, PodcastStatus.CANCELLED)
@ -210,7 +210,7 @@ class PodcastService:
def _transition(self, podcast: Podcast, target: PodcastStatus) -> None:
current = _status(podcast)
if target not in _ALLOWED[current]:
raise InvalidTransition(
raise InvalidTransitionError(
f"{current.value} -> {target.value} is not allowed"
)
podcast.status = target

View file

@ -36,9 +36,10 @@ def draft_transcript_task(self, podcast_id: int, search_space_id: int) -> dict:
return run_async_celery_task(
lambda: _draft_transcript(podcast_id, search_space_id)
)
except Exception as exc: # noqa: BLE001 - record and report, never crash worker
except Exception as exc:
logger.error("Podcast %s drafting failed: %s", podcast_id, exc)
run_async_celery_task(lambda: mark_failed(podcast_id, str(exc)))
message = str(exc)
run_async_celery_task(lambda: mark_failed(podcast_id, message))
return {"status": "failed", "podcast_id": podcast_id}

View file

@ -15,7 +15,7 @@ from app.celery_app import celery_app
from app.podcasts.persistence import PodcastRepository
from app.podcasts.rendering import PodcastRenderer
from app.podcasts.service import (
InvalidTransition,
InvalidTransitionError,
PodcastService,
read_spec,
read_transcript,
@ -36,9 +36,10 @@ _WORKDIR_BASE = Path(tempfile.gettempdir()) / "surfsense_podcasts"
def render_audio_task(self, podcast_id: int) -> dict:
try:
return run_async_celery_task(lambda: _render_audio(podcast_id))
except Exception as exc: # noqa: BLE001 - record and report, never crash worker
except Exception as exc:
logger.error("Podcast %s render failed: %s", podcast_id, exc)
run_async_celery_task(lambda: mark_failed(podcast_id, str(exc)))
message = str(exc)
run_async_celery_task(lambda: mark_failed(podcast_id, message))
return {"status": "failed", "podcast_id": podcast_id}
@ -75,7 +76,7 @@ async def _render_audio(podcast_id: int) -> dict:
podcast, storage_backend=backend_name, storage_key=key
)
await session.commit()
except InvalidTransition:
except InvalidTransitionError:
# A user back-out won the race (e.g. the regeneration was
# reverted): drop the stale render and leave the row alone.
await purge_audio_object(key)

View file

@ -50,9 +50,7 @@ class KokoroTextToSpeech(TextToSpeech):
async def synthesize(self, request: SynthesisRequest) -> SynthesizedAudio:
if not isinstance(request.voice, str):
raise TextToSpeechError(
"Kokoro voices are named by string, not a mapping"
)
raise TextToSpeechError("Kokoro voices are named by string, not a mapping")
pipeline = self._pipeline_for(request.language)
loop = asyncio.get_event_loop()
@ -67,7 +65,7 @@ class KokoroTextToSpeech(TextToSpeech):
),
)
segments = [audio for _gs, _ps, audio in generator]
except Exception as exc: # noqa: BLE001 - normalise provider errors
except Exception as exc:
raise TextToSpeechError(f"Kokoro synthesis failed: {exc}") from exc
if not segments:

View file

@ -57,10 +57,8 @@ class LiteLlmTextToSpeech(TextToSpeech):
try:
response = await aspeech(**kwargs)
except Exception as exc: # noqa: BLE001 - normalise provider errors
raise TextToSpeechError(
f"{self._model} synthesis failed: {exc}"
) from exc
except Exception as exc:
raise TextToSpeechError(f"{self._model} synthesis failed: {exc}") from exc
data = getattr(response, "content", None)
if not data:

View file

@ -6,7 +6,7 @@ configured provider via :func:`provider_from_service`.
from __future__ import annotations
from .catalog import VoiceCatalog, get_voice_catalog
from .catalog import LanguageOffering, VoiceCatalog, get_voice_catalog
from .preview import render_voice_preview
from .provider import TtsProvider, provider_from_service
from .voice import ANY_LANGUAGE, CatalogVoice, VoiceGender
@ -14,6 +14,7 @@ from .voice import ANY_LANGUAGE, CatalogVoice, VoiceGender
__all__ = [
"ANY_LANGUAGE",
"CatalogVoice",
"LanguageOffering",
"TtsProvider",
"VoiceCatalog",
"VoiceGender",

View file

@ -9,11 +9,26 @@ provider-native reference.
from __future__ import annotations
from collections.abc import Iterable
from dataclasses import dataclass
from functools import lru_cache
from .data import AZURE_VOICES, KOKORO_VOICES, OPENAI_VOICES, VERTEX_VOICES
from .data.languages import COMMON_LANGUAGES
from .provider import TtsProvider
from .voice import CatalogVoice
from .voice import ANY_LANGUAGE, CatalogVoice
@dataclass(frozen=True, slots=True)
class LanguageOffering:
"""The languages a provider's roster can offer the brief form.
``allows_custom`` is true when the roster has wildcard voices: the listed
languages are then a curated starting point, not a limit, and any BCP-47
tag may be entered.
"""
languages: list[str]
allows_custom: bool
class VoiceCatalog:
@ -36,9 +51,7 @@ class VoiceCatalog:
"""All voices offered by ``provider``, in catalog order."""
return list(self._by_provider.get(provider, ()))
def for_language(
self, provider: TtsProvider, language: str
) -> list[CatalogVoice]:
def for_language(self, provider: TtsProvider, language: str) -> list[CatalogVoice]:
"""``provider`` voices that can render ``language``, in catalog order."""
return [v for v in self.for_provider(provider) if v.speaks(language)]
@ -46,10 +59,22 @@ class VoiceCatalog:
"""Whether ``provider`` has at least one voice for ``language``."""
return any(v.speaks(language) for v in self.for_provider(provider))
def offerable_languages(self, provider: TtsProvider) -> LanguageOffering:
"""The languages ``provider`` can offer up front.
Language-bound voices contribute their concrete tags; wildcard voices
cannot enumerate languages, so their presence merges in the curated
common list and opens free entry.
"""
voices = self.for_provider(provider)
tags = {v.language for v in voices if v.language != ANY_LANGUAGE}
has_wildcard = any(v.language == ANY_LANGUAGE for v in voices)
if has_wildcard:
tags.update(COMMON_LANGUAGES)
return LanguageOffering(languages=sorted(tags), allows_custom=has_wildcard)
@lru_cache(maxsize=1)
def get_voice_catalog() -> VoiceCatalog:
"""The process-wide catalog assembled from every provider's roster."""
return VoiceCatalog(
(*KOKORO_VOICES, *OPENAI_VOICES, *AZURE_VOICES, *VERTEX_VOICES)
)
return VoiceCatalog((*KOKORO_VOICES, *OPENAI_VOICES, *AZURE_VOICES, *VERTEX_VOICES))

View file

@ -0,0 +1,33 @@
"""Curated languages offered when a roster has wildcard (any-language) voices.
OpenAI-style multilingual voices speak whatever language the text is in, so
there is no provider list to enumerate. This is the set the brief form offers
up front for such providers; it is an offering, not a limit the API flags
``allows_custom`` so users can enter any BCP-47 tag beyond it.
"""
from __future__ import annotations
COMMON_LANGUAGES: tuple[str, ...] = (
"ar",
"bn",
"de",
"en",
"es",
"fr",
"hi",
"id",
"it",
"ja",
"ko",
"nl",
"pl",
"pt",
"ru",
"sw",
"th",
"tr",
"uk",
"vi",
"zh",
)

View file

@ -30,10 +30,52 @@ def _voice(
VERTEX_VOICES: tuple[CatalogVoice, ...] = (
_voice("en-US-Studio-O", "en-US", "en-US", "en-US-Studio-O", "Studio O (US)", VoiceGender.FEMALE),
_voice("en-US-Studio-M", "en-US", "en-US", "en-US-Studio-M", "Studio M (US)", VoiceGender.MALE),
_voice("en-GB-Studio-A", "en-GB", "en-UK", "en-UK-Studio-A", "Studio A (UK)", VoiceGender.FEMALE),
_voice("en-GB-Studio-B", "en-GB", "en-UK", "en-UK-Studio-B", "Studio B (UK)", VoiceGender.MALE),
_voice("en-AU-Studio-A", "en-AU", "en-AU", "en-AU-Studio-A", "Studio A (AU)", VoiceGender.FEMALE),
_voice("en-AU-Studio-B", "en-AU", "en-AU", "en-AU-Studio-B", "Studio B (AU)", VoiceGender.MALE),
_voice(
"en-US-Studio-O",
"en-US",
"en-US",
"en-US-Studio-O",
"Studio O (US)",
VoiceGender.FEMALE,
),
_voice(
"en-US-Studio-M",
"en-US",
"en-US",
"en-US-Studio-M",
"Studio M (US)",
VoiceGender.MALE,
),
_voice(
"en-GB-Studio-A",
"en-GB",
"en-UK",
"en-UK-Studio-A",
"Studio A (UK)",
VoiceGender.FEMALE,
),
_voice(
"en-GB-Studio-B",
"en-GB",
"en-UK",
"en-UK-Studio-B",
"Studio B (UK)",
VoiceGender.MALE,
),
_voice(
"en-AU-Studio-A",
"en-AU",
"en-AU",
"en-AU-Studio-A",
"Studio A (AU)",
VoiceGender.FEMALE,
),
_voice(
"en-AU-Studio-B",
"en-AU",
"en-AU",
"en-AU-Studio-B",
"Studio B (AU)",
VoiceGender.MALE,
),
)

View file

@ -30,7 +30,7 @@ _SAMPLE_TEXTS = {
"it": "Ciao! Questa è la mia voce quando racconto il tuo podcast.",
"ja": "こんにちは。ポッドキャストをお届けするときの私の声です。",
"pt": "Olá! É assim que eu soo ao narrar o seu podcast.",
"zh": "你好!这就是我为你播报播客时的声音。",
"zh": "你好!这就是我为你播报播客时的声音。", # noqa: RUF001
}
_CONTENT_TYPES = {"mp3": "audio/mpeg", "wav": "audio/wav"}
@ -40,9 +40,7 @@ async def render_voice_preview(
voice: CatalogVoice, tts: TextToSpeech
) -> tuple[bytes, str]:
"""Return ``(audio_bytes, content_type)`` for a sample spoken by ``voice``."""
language = (
_FALLBACK_LANGUAGE if voice.language == ANY_LANGUAGE else voice.language
)
language = _FALLBACK_LANGUAGE if voice.language == ANY_LANGUAGE else voice.language
request = SynthesisRequest(
text=_sample_text(language), voice=voice.native_ref, language=language
)

View file

@ -59,7 +59,7 @@ def _card_error_payment_intent_id(exc: CardError) -> str | None:
@celery_app.task(name="auto_reload_credits")
def auto_reload_credits_task(user_id: str):
"""Charge the user's saved card to top up credits when below threshold."""
return run_async_celery_task(_auto_reload_credits, user_id)
return run_async_celery_task(lambda: _auto_reload_credits(user_id))
async def _auto_reload_credits(user_id: str) -> None:

View file

@ -36,7 +36,11 @@ def iter_completion_emission_frames(
"success",
)
elif status in ("failed", "error"):
error_msg = out.get("error", "Unknown error") if isinstance(out, dict) else "Unknown error"
error_msg = (
out.get("error", "Unknown error")
if isinstance(out, dict)
else "Unknown error"
)
yield ctx.streaming_service.format_terminal_info(
f"Podcast generation failed: {error_msg}",
"error",

View file

@ -86,66 +86,54 @@ def _quote_identifier(identifier: str) -> str:
return '"' + identifier.replace('"', '""') + '"'
def _column_exists(conn: Connection, table: str, column: str) -> bool:
return (
conn.execute(
text(
"SELECT 1 FROM information_schema.columns "
"WHERE table_schema = current_schema() "
"AND table_name = :table AND column_name = :column"
),
{"table": table, "column": column},
).fetchone()
is not None
)
def _table_columns(conn: Connection, table: str) -> set[str]:
rows = conn.execute(
text(
"SELECT column_name FROM information_schema.columns "
"WHERE table_schema = current_schema() AND table_name = :table"
),
{"table": table},
).fetchall()
return {row[0] for row in rows}
def _table_exists(conn: Connection, table: str) -> bool:
return (
conn.execute(
text(
"SELECT 1 FROM information_schema.tables "
"WHERE table_schema = current_schema() "
"AND table_name = :table"
),
{"table": table},
).fetchone()
is not None
)
def _expected_columns(
conn: Connection, table: str, *, include_missing_columns: bool = True
) -> list[str] | None:
def _expected_columns(conn: Connection, table: str) -> list[str] | None:
columns = ZERO_PUBLICATION[table]
if columns is None:
return None
if include_missing_columns:
expected = list(columns)
else:
expected = [column for column in columns if _column_exists(conn, table, column)]
if table in {"documents", "user", "podcasts"} and _column_exists(
conn, table, "_0_version"
expected = list(columns)
if table in {"documents", "user", "podcasts"} and "_0_version" in _table_columns(
conn, table
):
expected.append("_0_version")
return expected
def _format_table_entry(
conn: Connection, table: str, *, include_missing_columns: bool = True
) -> str | None:
if not include_missing_columns and not _table_exists(conn, table):
def _format_table_entry(conn: Connection, table: str) -> str | None:
"""Render one SET TABLE entry, or ``None`` if the table isn't ready.
Historical migrations (e.g. 155/156) call ``apply_publication`` while the
schema is still mid-history, before later migrations add columns that the
canonical shape references. A table is only published once it exists AND
every canonical column exists; otherwise it is omitted entirely and a later
reconcile migration (e.g. 159) picks it up once its columns land. Partial
column lists are deliberately avoided: publishing a column early would
block later ``ALTER COLUMN ... TYPE`` migrations on it (Postgres forbids
retyping columns a publication depends on). ``verify_publication`` remains
strict against the unfiltered canonical shape.
"""
actual = _table_columns(conn, table)
if not actual:
return None
columns = _expected_columns(
conn, table, include_missing_columns=include_missing_columns
)
table_sql = _quote_identifier(table)
columns = _expected_columns(conn, table)
if columns is None:
return table_sql
if not include_missing_columns and not columns:
if any(column not in actual for column in columns):
return None
column_sql = ", ".join(_quote_identifier(column) for column in columns)
@ -155,17 +143,8 @@ def _format_table_entry(
def build_set_table_sql(conn: Connection) -> str:
"""Build the canonical plain SET TABLE statement for Zero's event triggers."""
table_entries = [
entry
for table in ZERO_PUBLICATION
if (
entry := _format_table_entry(
conn, table, include_missing_columns=False
)
)
is not None
]
table_list = ", ".join(table_entries)
entries = [_format_table_entry(conn, table) for table in ZERO_PUBLICATION]
table_list = ", ".join(entry for entry in entries if entry is not None)
return f"ALTER PUBLICATION {_quote_identifier(PUBLICATION_NAME)} SET TABLE {table_list}"