Merge commit '7ce409c580' into dev

This commit is contained in:
DESKTOP-RTLN3BA\$punk 2026-06-16 22:48:14 -07:00
commit 0fe650fd8e
27 changed files with 510 additions and 134 deletions

View file

@ -106,6 +106,7 @@ services:
volumes:
- ../surfsense_backend/app:/app/app
- shared_temp:/shared_tmp
- object_store:/app/.local_object_store
env_file:
- ../surfsense_backend/.env
extra_hosts:
@ -119,6 +120,7 @@ services:
- PYTHONPATH=/app
- UVICORN_LOOP=asyncio
- UNSTRUCTURED_HAS_PATCHED_LOOP=1
- FILE_STORAGE_LOCAL_PATH=/app/.local_object_store
- LANGCHAIN_TRACING_V2=false
- LANGSMITH_TRACING=false
- AUTH_TYPE=${AUTH_TYPE:-LOCAL}
@ -171,6 +173,7 @@ services:
volumes:
- ../surfsense_backend/app:/app/app
- shared_temp:/shared_tmp
- object_store:/app/.local_object_store
env_file:
- ../surfsense_backend/.env
extra_hosts:
@ -182,6 +185,7 @@ services:
- REDIS_APP_URL=${REDIS_URL:-redis://redis:6379/0}
- CELERY_TASK_DEFAULT_QUEUE=surfsense
- PYTHONPATH=/app
- FILE_STORAGE_LOCAL_PATH=/app/.local_object_store
- SEARXNG_DEFAULT_HOST=${SEARXNG_DEFAULT_HOST:-http://searxng:8080}
- SERVICE_ROLE=worker
depends_on:
@ -277,6 +281,8 @@ volumes:
name: surfsense-dev-redis
shared_temp:
name: surfsense-dev-shared-temp
object_store:
name: surfsense-dev-object-store
zero_cache_data:
name: surfsense-dev-zero-cache
whatsapp_sessions:

View file

@ -129,6 +129,7 @@ services:
- "8000"
volumes:
- shared_temp:/shared_tmp
- object_store:/app/.local_object_store
env_file:
- .env
extra_hosts:
@ -142,6 +143,7 @@ services:
PYTHONPATH: /app
UVICORN_LOOP: asyncio
UNSTRUCTURED_HAS_PATCHED_LOOP: "1"
FILE_STORAGE_LOCAL_PATH: /app/.local_object_store
NEXT_FRONTEND_URL: ${NEXT_FRONTEND_URL:-${SURFSENSE_PUBLIC_URL:-http://localhost:${LISTEN_HTTP_PORT:-3929}}}
BACKEND_URL: ${BACKEND_URL:-${SURFSENSE_PUBLIC_URL:-http://localhost:${LISTEN_HTTP_PORT:-3929}}}
SEARXNG_DEFAULT_HOST: ${SEARXNG_DEFAULT_HOST:-http://searxng:8080}
@ -195,6 +197,7 @@ services:
image: ghcr.io/modsetter/surfsense-backend:${SURFSENSE_VERSION:-latest}${SURFSENSE_VARIANT:+-${SURFSENSE_VARIANT}}
volumes:
- shared_temp:/shared_tmp
- object_store:/app/.local_object_store
env_file:
- .env
extra_hosts:
@ -206,6 +209,7 @@ services:
REDIS_APP_URL: ${REDIS_URL:-redis://redis:6379/0}
CELERY_TASK_DEFAULT_QUEUE: surfsense
PYTHONPATH: /app
FILE_STORAGE_LOCAL_PATH: /app/.local_object_store
SEARXNG_DEFAULT_HOST: ${SEARXNG_DEFAULT_HOST:-http://searxng:8080}
SERVICE_ROLE: worker
depends_on:
@ -305,6 +309,8 @@ volumes:
name: surfsense-redis
shared_temp:
name: surfsense-shared-temp
object_store:
name: surfsense-object-store
zero_cache_data:
name: surfsense-zero-cache
caddy_data:

View file

@ -1,5 +1,20 @@
DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/surfsense
# --- Database startup / safety knobs (optional) ---
# Run extension/table/index DDL on app startup. Set FALSE when schema is owned
# exclusively by Alembic migrations.
# DB_BOOTSTRAP_ON_STARTUP=TRUE
# lock_timeout (ms) for boot-time DDL so a contended CREATE INDEX/TABLE fails
# fast instead of hanging the FastAPI lifespan behind another transaction.
# DB_DDL_LOCK_TIMEOUT_MS=5000
# idle_in_transaction_session_timeout (ms) so an abandoned "idle in transaction"
# session can't wedge the DB indefinitely. 0 disables. (asyncpg only)
# DB_IDLE_IN_TX_TIMEOUT_MS=900000
# Same, for the Celery worker engine (long ingestion/podcast/video tasks). If a
# task hasn't touched the DB in this window it's treated as orphaned and dropped.
# 0 disables. (asyncpg only)
# DB_CELERY_IDLE_IN_TX_TIMEOUT_MS=3600000
# Deployment environment: dev or production
SURFSENSE_ENV=dev

View file

@ -486,6 +486,28 @@ class Config:
# Database
DATABASE_URL = os.getenv("DATABASE_URL")
# When TRUE (default) the app ensures extensions/tables/indexes exist on
# startup. Set FALSE in environments where schema is owned exclusively by
# Alembic migrations to skip all boot-time DDL.
DB_BOOTSTRAP_ON_STARTUP = (
os.getenv("DB_BOOTSTRAP_ON_STARTUP", "TRUE").upper() == "TRUE"
)
# Per-session lock_timeout (ms) applied to boot-time DDL so a contended
# CREATE INDEX / CREATE TABLE fails fast instead of hanging the FastAPI
# lifespan forever behind another transaction's lock.
DB_DDL_LOCK_TIMEOUT_MS = int(os.getenv("DB_DDL_LOCK_TIMEOUT_MS", "5000"))
# Global idle_in_transaction_session_timeout (ms) applied to every pooled
# connection so an abandoned "idle in transaction" session can't wedge the
# database indefinitely. 0 disables. Only applied to asyncpg connections.
DB_IDLE_IN_TX_TIMEOUT_MS = int(os.getenv("DB_IDLE_IN_TX_TIMEOUT_MS", "900000"))
# Same protection for the separate Celery worker engine, where long-running
# ingestion/podcast/video tasks live. Kept higher than the web default so a
# legitimate per-document embed window is never reaped: if a task hasn't
# touched the DB in 60 min it's treated as orphaned and dropped. 0 disables.
DB_CELERY_IDLE_IN_TX_TIMEOUT_MS = int(
os.getenv("DB_CELERY_IDLE_IN_TX_TIMEOUT_MS", "3600000")
)
# Celery / Redis
# Redis (single endpoint for Celery broker, result backend, and app cache).
# Legacy CELERY_BROKER_URL / CELERY_RESULT_BACKEND / REDIS_APP_URL still

View file

@ -1,3 +1,4 @@
import logging
import uuid
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
@ -34,6 +35,8 @@ from app.config import config
if config.AUTH_TYPE == "GOOGLE":
from fastapi_users.db import SQLAlchemyBaseOAuthAccountTableUUID
logger = logging.getLogger(__name__)
DATABASE_URL = config.DATABASE_URL
@ -2726,6 +2729,28 @@ from app.podcasts.persistence import ( # noqa: E402, F401
PodcastStatus,
)
def _build_connect_args() -> dict:
"""Build driver connect_args, including a protective idle-in-transaction
timeout for asyncpg connections.
A single abandoned ``idle in transaction`` session can hold table/row locks
indefinitely and wedge writes plus boot-time DDL (the classic "FastAPI
stuck at application startup" failure). Setting
``idle_in_transaction_session_timeout`` server-side makes Postgres reap such
sessions automatically. It never affects sessions that are actively running
statements only ones that opened a transaction and went idle.
"""
connect_args: dict = {}
idle_ms = config.DB_IDLE_IN_TX_TIMEOUT_MS
# ``server_settings`` is asyncpg-specific; only apply it for that driver.
if idle_ms and idle_ms > 0 and DATABASE_URL and "asyncpg" in DATABASE_URL:
connect_args["server_settings"] = {
"idle_in_transaction_session_timeout": str(idle_ms)
}
return connect_args
engine = create_async_engine(
DATABASE_URL,
pool_size=30,
@ -2733,6 +2758,7 @@ engine = create_async_engine(
pool_recycle=1800,
pool_pre_ping=True,
pool_timeout=30,
connect_args=_build_connect_args(),
)
async_session_maker = async_sessionmaker(engine, expire_on_commit=False)
@ -2757,54 +2783,117 @@ async def shielded_async_session():
await session.close()
async def setup_indexes():
async with engine.begin() as conn:
# Create indexes
# Document embedding indexes
await conn.execute(
text(
"CREATE INDEX IF NOT EXISTS document_vector_index ON documents USING hnsw (embedding public.vector_cosine_ops)"
)
)
await conn.execute(
text(
"CREATE INDEX IF NOT EXISTS document_search_index ON documents USING gin (to_tsvector('english', content))"
)
)
# Document Chuck Indexes
await conn.execute(
text(
"CREATE INDEX IF NOT EXISTS chucks_vector_index ON chunks USING hnsw (embedding public.vector_cosine_ops)"
)
)
await conn.execute(
text(
"CREATE INDEX IF NOT EXISTS chucks_search_index ON chunks USING gin (to_tsvector('english', content))"
)
)
# pg_trgm indexes for efficient ILIKE '%term%' searches on titles
# Critical for document mention picker (@mentions) to scale
await conn.execute(
text(
"CREATE INDEX IF NOT EXISTS idx_documents_title_trgm ON documents USING gin (title gin_trgm_ops)"
)
)
# B-tree index on search_space_id for fast filtering
await conn.execute(
text(
"CREATE INDEX IF NOT EXISTS idx_documents_search_space_id ON documents (search_space_id)"
)
)
# Covering index for "recent documents" query - enables index-only scan
await conn.execute(
text(
"CREATE INDEX IF NOT EXISTS idx_documents_search_space_updated ON documents (search_space_id, updated_at DESC NULLS LAST) INCLUDE (id, title, document_type)"
)
# (index_name, table, CREATE statement). Built with CONCURRENTLY so an index
# build only takes a non-blocking ShareUpdateExclusiveLock — ingestion
# INSERT/UPDATE on documents/chunks keep flowing while the index builds, and a
# slow build can never freeze the FastAPI lifespan or block writers.
_INDEX_DEFINITIONS: list[tuple[str, str, str]] = [
(
"document_vector_index",
"documents",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS document_vector_index ON documents USING hnsw (embedding public.vector_cosine_ops)",
),
(
"document_search_index",
"documents",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS document_search_index ON documents USING gin (to_tsvector('english', content))",
),
(
"chucks_vector_index",
"chunks",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS chucks_vector_index ON chunks USING hnsw (embedding public.vector_cosine_ops)",
),
(
"chucks_search_index",
"chunks",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS chucks_search_index ON chunks USING gin (to_tsvector('english', content))",
),
# pg_trgm index for efficient ILIKE '%term%' searches on titles — critical
# for the document mention picker (@mentions) to scale.
(
"idx_documents_title_trgm",
"documents",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_documents_title_trgm ON documents USING gin (title gin_trgm_ops)",
),
(
"idx_documents_search_space_id",
"documents",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_documents_search_space_id ON documents (search_space_id)",
),
# Covering index for "recent documents" query — enables index-only scan.
(
"idx_documents_search_space_updated",
"documents",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_documents_search_space_updated ON documents (search_space_id, updated_at DESC NULLS LAST) INCLUDE (id, title, document_type)",
),
]
async def _drop_invalid_index(conn, name: str) -> None:
"""Drop a leftover *invalid* index so it can be rebuilt.
A ``CREATE INDEX CONCURRENTLY`` that is interrupted (timeout, crash,
cancellation) leaves behind an ``indisvalid = false`` index. Because the
name now exists, a later ``CREATE INDEX CONCURRENTLY IF NOT EXISTS`` would
skip it and the broken index would persist forever. Detect and drop it
first.
"""
result = await conn.execute(
text("SELECT indisvalid FROM pg_index WHERE indexrelid = to_regclass(:n)"),
{"n": name},
)
row = result.first()
if row is not None and row[0] is False:
logger.warning(
"[startup] dropping invalid leftover index %s before rebuild", name
)
await conn.execute(text(f'DROP INDEX CONCURRENTLY IF EXISTS "{name}"'))
async def setup_indexes() -> None:
"""Ensure search/vector indexes exist without ever blocking startup.
Each index is created with ``CONCURRENTLY`` (so it never takes a blocking
SHARE lock on documents/chunks) under a short per-session ``lock_timeout``
(so a contended boot fails fast instead of hanging the lifespan forever).
Failures are logged and swallowed per-index a missing index just gets
retried on the next boot rather than crash-looping the API.
"""
lock_timeout_ms = int(config.DB_DDL_LOCK_TIMEOUT_MS)
# AUTOCOMMIT is mandatory: CREATE INDEX CONCURRENTLY cannot run inside a
# transaction block.
async with engine.connect() as base_conn:
conn = await base_conn.execution_options(isolation_level="AUTOCOMMIT")
await conn.execute(text(f"SET lock_timeout = {lock_timeout_ms}"))
for name, table, ddl in _INDEX_DEFINITIONS:
try:
await _drop_invalid_index(conn, name)
await conn.execute(text(ddl))
except Exception as exc:
# Non-fatal by design: a missing index is retried next boot.
logger.warning(
"[startup] index %s on %s not ready (%s: %s); "
"will retry on next boot",
name,
table,
exc.__class__.__name__,
exc,
)
async def create_db_and_tables():
if not config.DB_BOOTSTRAP_ON_STARTUP:
logger.info(
"[startup] DB bootstrap skipped (DB_BOOTSTRAP_ON_STARTUP=FALSE); "
"schema/indexes are expected to be managed by migrations"
)
return
lock_timeout_ms = int(config.DB_DDL_LOCK_TIMEOUT_MS)
async with engine.begin() as conn:
# Fail fast instead of hanging forever if another session holds a
# conflicting lock on a table we need to touch.
await conn.execute(text(f"SET LOCAL lock_timeout = {lock_timeout_ms}"))
await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector"))
await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm"))
await conn.run_sync(Base.metadata.create_all)

View file

@ -27,14 +27,14 @@ from app.db import (
get_async_session,
)
from app.podcasts.generation.brief import propose_brief
from app.podcasts.persistence import Podcast, PodcastRepository
from app.podcasts.persistence import Podcast, PodcastRepository, PodcastStatus
from app.podcasts.service import (
InvalidTransitionError,
PodcastService,
PreconditionFailedError,
SpecConflictError,
)
from app.podcasts.storage import open_audio_stream, purge_audio
from app.podcasts.storage import audio_exists, open_audio_stream, purge_audio
from app.podcasts.tasks import draft_transcript_task
from app.podcasts.tts import get_text_to_speech
from app.podcasts.voices import (
@ -172,8 +172,8 @@ async def create_podcast(
session,
search_space_id=body.search_space_id,
speaker_count=body.speaker_count,
min_minutes=body.min_minutes,
max_minutes=body.max_minutes,
min_seconds=body.min_seconds,
max_seconds=body.max_seconds,
focus=body.focus,
)
await service.attach_brief(podcast, spec)
@ -287,6 +287,11 @@ async def stream_podcast(
podcast = await _load(session, user, podcast_id, Permission.PODCASTS_READ)
if podcast.storage_key:
# Verify first so a missing object is a 404, not a mid-stream crash.
if not await audio_exists(podcast):
raise HTTPException(
status_code=404, detail="Podcast audio is no longer available"
)
return StreamingResponse(
open_audio_stream(podcast),
media_type="audio/mpeg",
@ -310,7 +315,10 @@ async def stream_podcast(
},
)
raise HTTPException(status_code=404, detail="Podcast audio not found")
# No audio: terminal states never will have any, otherwise it's in flight.
if PodcastStatus(podcast.status).is_terminal:
raise HTTPException(status_code=404, detail="Podcast audio not found")
raise HTTPException(status_code=409, detail="Podcast audio is not ready yet")
async def _require(

View file

@ -11,6 +11,12 @@ from datetime import datetime
from pydantic import BaseModel, ConfigDict, Field
from app.podcasts.duration_limits import (
DEFAULT_MAX_SECONDS,
DEFAULT_MIN_SECONDS,
MAX_DURATION_SECONDS,
MIN_DURATION_SECONDS,
)
from app.podcasts.persistence import Podcast, PodcastStatus
from app.podcasts.schemas import PodcastSpec, Transcript
from app.podcasts.service import has_stored_episode, read_spec, read_transcript
@ -18,8 +24,6 @@ from app.podcasts.service import has_stored_episode, read_spec, read_transcript
# Defaults applied when a create request omits brief sizing; the brief gate lets
# the user adjust before any cost is incurred.
DEFAULT_SPEAKER_COUNT = 2
DEFAULT_MIN_MINUTES = 10
DEFAULT_MAX_MINUTES = 20
class CreatePodcastRequest(BaseModel):
@ -30,8 +34,16 @@ class CreatePodcastRequest(BaseModel):
source_content: str = Field(..., min_length=1)
thread_id: int | None = None
speaker_count: int = Field(default=DEFAULT_SPEAKER_COUNT, ge=1, le=6)
min_minutes: int = Field(default=DEFAULT_MIN_MINUTES, ge=1)
max_minutes: int = Field(default=DEFAULT_MAX_MINUTES, ge=1)
min_seconds: int = Field(
default=DEFAULT_MIN_SECONDS,
ge=MIN_DURATION_SECONDS,
le=MAX_DURATION_SECONDS,
)
max_seconds: int = Field(
default=DEFAULT_MAX_SECONDS,
ge=MIN_DURATION_SECONDS,
le=MAX_DURATION_SECONDS,
)
focus: str | None = Field(default=None, max_length=2000)

View file

@ -0,0 +1,6 @@
"""Shared bounds and defaults for podcast target duration."""
MAX_DURATION_SECONDS = 24 * 60 * 60
MIN_DURATION_SECONDS = 15
DEFAULT_MIN_SECONDS = 20
DEFAULT_MAX_SECONDS = 30

View file

@ -6,10 +6,13 @@ from dataclasses import dataclass, field, fields
from langchain_core.runnables import RunnableConfig
from app.podcasts.duration_limits import (
DEFAULT_MAX_SECONDS,
DEFAULT_MIN_SECONDS,
)
# Sensible defaults for a fresh brief; the user adjusts the range at the gate.
DEFAULT_SPEAKER_COUNT = 2
DEFAULT_MIN_MINUTES = 10
DEFAULT_MAX_MINUTES = 20
@dataclass(kw_only=True)
@ -17,8 +20,8 @@ class BriefConfig:
"""Signals used to propose a brief; everything here is non-LLM context."""
speaker_count: int = DEFAULT_SPEAKER_COUNT
min_minutes: int = DEFAULT_MIN_MINUTES
max_minutes: int = DEFAULT_MAX_MINUTES
min_seconds: int = DEFAULT_MIN_SECONDS
max_seconds: int = DEFAULT_MAX_SECONDS
focus: str | None = None
last_used_language: str | None = None
last_used_voices: list[str] = field(default_factory=list)

View file

@ -79,7 +79,7 @@ def propose_spec(state: BriefState, config: RunnableConfig) -> dict[str, Any]:
style=PodcastStyle.CONVERSATIONAL,
speakers=speakers,
duration=DurationTarget(
min_minutes=brief.min_minutes, max_minutes=brief.max_minutes
min_seconds=brief.min_seconds, max_seconds=brief.max_seconds
),
focus=brief.focus,
)

View file

@ -4,11 +4,12 @@ from __future__ import annotations
from sqlalchemy.ext.asyncio import AsyncSession
from app.podcasts.duration_limits import DEFAULT_MAX_SECONDS, DEFAULT_MIN_SECONDS
from app.podcasts.persistence import PodcastRepository
from app.podcasts.schemas import PodcastSpec
from app.podcasts.service import preferences_from
from .config import DEFAULT_MAX_MINUTES, DEFAULT_MIN_MINUTES, DEFAULT_SPEAKER_COUNT
from .config import DEFAULT_SPEAKER_COUNT
from .graph import graph as brief_graph
from .state import BriefState
@ -18,8 +19,8 @@ async def propose_brief(
*,
search_space_id: int,
speaker_count: int = DEFAULT_SPEAKER_COUNT,
min_minutes: int = DEFAULT_MIN_MINUTES,
max_minutes: int = DEFAULT_MAX_MINUTES,
min_seconds: int = DEFAULT_MIN_SECONDS,
max_seconds: int = DEFAULT_MAX_SECONDS,
focus: str | None = None,
) -> PodcastSpec:
"""Reuse the last-used language and voices, else English; return the spec."""
@ -29,8 +30,8 @@ async def propose_brief(
config = {
"configurable": {
"speaker_count": speaker_count,
"min_minutes": min_minutes,
"max_minutes": max_minutes,
"min_seconds": min_seconds,
"max_seconds": max_seconds,
"focus": focus,
"last_used_language": last_language,
"last_used_voices": last_voices,

View file

@ -38,7 +38,7 @@ async def plan_outline(
tc = TranscriptConfig.from_runnable_config(config)
llm = await _require_llm(state, tc)
target_words = round(tc.spec.duration.midpoint_minutes * _WORDS_PER_MINUTE)
target_words = round(tc.spec.duration.midpoint_seconds * _WORDS_PER_MINUTE / 60)
suggested_segments = max(1, round(target_words / _WORDS_PER_SEGMENT))
messages = [

View file

@ -10,17 +10,19 @@ from __future__ import annotations
import re
from enum import StrEnum
from typing import Any
from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator
from app.podcasts.duration_limits import (
MAX_DURATION_SECONDS,
MIN_DURATION_SECONDS,
)
# A speaker count beyond this is almost never a real podcast and explodes the
# voice/turn-attribution space, so we reject it at the brief gate.
MAX_SPEAKERS = 6
# Long-form is a goal, but an open-ended upper bound invites runaway TTS bills.
# One day of audio is a generous ceiling that still blocks obvious mistakes.
MAX_DURATION_MINUTES = 24 * 60
# BCP-47 primary subtag plus optional region (e.g. ``en``, ``en-US``, ``pt-BR``).
# Kept deliberately permissive: the voice catalog, not the brief, decides which
# languages can actually be synthesised. Casing is normalised after matching.
@ -91,7 +93,7 @@ class SpeakerSpec(BaseModel):
class DurationTarget(BaseModel):
"""The desired finished length as an inclusive minute range.
"""The desired finished length as an inclusive second range.
Drafting aims for the midpoint and treats the bounds as soft guardrails;
storing a range (rather than a point) keeps long-form expectations honest
@ -100,19 +102,38 @@ class DurationTarget(BaseModel):
model_config = ConfigDict(extra="forbid")
min_minutes: int = Field(..., ge=1, le=MAX_DURATION_MINUTES)
max_minutes: int = Field(..., ge=1, le=MAX_DURATION_MINUTES)
min_seconds: int = Field(..., ge=MIN_DURATION_SECONDS, le=MAX_DURATION_SECONDS)
max_seconds: int = Field(..., ge=MIN_DURATION_SECONDS, le=MAX_DURATION_SECONDS)
@model_validator(mode="before")
@classmethod
def _coerce_legacy_minutes(cls, data: Any) -> Any:
"""Rows stored before seconds-based briefs still load from JSONB."""
if (
isinstance(data, dict)
and "min_seconds" not in data
and "min_minutes" in data
):
migrated = dict(data)
migrated["min_seconds"] = int(migrated.pop("min_minutes")) * 60
migrated["max_seconds"] = int(migrated.pop("max_minutes")) * 60
return migrated
return data
@model_validator(mode="after")
def _check_order(self) -> DurationTarget:
if self.max_minutes < self.min_minutes:
raise ValueError("max_minutes must be >= min_minutes")
if self.max_seconds < self.min_seconds:
raise ValueError("max_seconds must be >= min_seconds")
return self
@property
def midpoint_minutes(self) -> float:
def midpoint_seconds(self) -> float:
"""The runtime drafting should aim for within the range."""
return (self.min_minutes + self.max_minutes) / 2
return (self.min_seconds + self.max_seconds) / 2
@property
def midpoint_minutes(self) -> float:
return self.midpoint_seconds / 60
class PodcastSpec(BaseModel):

View file

@ -42,6 +42,13 @@ def open_audio_stream(podcast: Podcast) -> AsyncIterator[bytes]:
return get_storage_backend().open_stream(podcast.storage_key)
async def audio_exists(podcast: Podcast) -> bool:
"""Whether the podcast's stored audio object is actually present."""
return bool(podcast.storage_key) and await get_storage_backend().exists(
podcast.storage_key
)
async def purge_audio(podcast: Podcast) -> None:
"""Delete a podcast's stored audio if present; a missing object is fine."""
await purge_audio_object(podcast.storage_key)

View file

@ -103,8 +103,14 @@ async def stream_public_podcast(
if storage_key:
from app.file_storage.factory import get_storage_backend
backend = get_storage_backend()
# Verify first so a missing object is a 404, not a mid-stream crash.
if not await backend.exists(storage_key):
raise HTTPException(
status_code=404, detail="Podcast audio is no longer available"
)
return StreamingResponse(
get_storage_backend().open_stream(storage_key),
backend.open_stream(storage_key),
media_type="audio/mpeg",
headers={"Accept-Ranges": "bytes"},
)

View file

@ -32,10 +32,27 @@ def get_celery_session_maker() -> async_sessionmaker:
"""
global _celery_engine, _celery_session_maker
if _celery_session_maker is None:
# Reap connections orphaned mid-transaction (e.g. a worker that hung or
# crashed mid-index) so they can't hold locks on documents/chunks and
# wedge writes — the failure mode that previously left an "idle in
# transaction" session holding locks for 11+ hours. Kept generous so a
# legitimate long per-document embed window is never killed.
connect_args: dict = {}
idle_ms = config.DB_CELERY_IDLE_IN_TX_TIMEOUT_MS
if (
idle_ms
and idle_ms > 0
and config.DATABASE_URL
and "asyncpg" in config.DATABASE_URL
):
connect_args["server_settings"] = {
"idle_in_transaction_session_timeout": str(idle_ms)
}
_celery_engine = create_async_engine(
config.DATABASE_URL,
poolclass=NullPool,
echo=False,
connect_args=connect_args,
)
with contextlib.suppress(Exception):
from app.observability.bootstrap import instrument_sqlalchemy_engine

View file

@ -120,6 +120,9 @@ class FakeStorageBackend:
async def open_stream(self, key: str) -> AsyncIterator[bytes]:
yield self.objects.get(key, b"audio-bytes")
async def exists(self, key: str) -> bool:
return key in self.objects
async def delete(self, key: str) -> None:
self.deleted.append(key)
@ -214,7 +217,7 @@ def build_spec(
slot=1, name="Guest", role=SpeakerRole.GUEST, voice_id=voice_ids[1]
),
],
duration=DurationTarget(min_minutes=10, max_minutes=20),
duration=DurationTarget(min_seconds=600, max_seconds=1200),
)

View file

@ -76,7 +76,7 @@ async def test_quota_denial_fails_the_podcast_without_a_transcript(
async def _deny(**_kwargs):
raise QuotaInsufficientError(
usage_type="podcast_generation",
balance_micros=0,
balance_micros=5_000_000,
remaining_micros=0,
)
yield # pragma: no cover - unreachable, satisfies the CM protocol

View file

@ -48,6 +48,22 @@ async def test_public_stream_serves_audio_via_storage_key(
assert resp.content == b"public-audio"
async def test_public_stream_404_when_object_missing(
client, db_session, db_search_space, db_user, fake_storage
):
await _snapshot(
db_session,
search_space_id=db_search_space.id,
user=db_user,
token="tok-gone",
podcasts=[{"original_id": 556, "storage_key": "podcasts/gone.mp3"}],
)
resp = await client.get("/api/v1/public/tok-gone/podcasts/556/stream")
assert resp.status_code == 404
async def test_public_stream_404_when_podcast_absent_from_snapshot(
client, db_session, db_search_space, db_user
):

View file

@ -1,8 +1,7 @@
"""Streaming a podcast's rendered audio over HTTP.
A ready podcast streams its bytes from the storage backend; a podcast with no
stored audio returns 404. Storage is an in-memory backend (the object store is a
system boundary).
A ready podcast streams its bytes; an in-flight one is 409, a stored-but-missing
object is 404. Storage is an in-memory backend (the object store is a boundary).
"""
from __future__ import annotations
@ -31,11 +30,23 @@ async def test_stream_serves_stored_audio(
assert resp.content == b"the-audio"
async def test_stream_404_when_no_audio(client, db_search_space, make_podcast):
async def test_stream_409_while_in_flight(client, db_search_space, make_podcast):
podcast = await make_podcast(
search_space_id=db_search_space.id, status=PodcastStatus.DRAFTING
)
resp = await client.get(f"{BASE}/{podcast.id}/stream")
assert resp.status_code == 409
async def test_stream_404_when_object_missing(
client, db_search_space, make_podcast, fake_storage
):
podcast = await make_podcast(
search_space_id=db_search_space.id, status=PodcastStatus.READY
)
resp = await client.get(f"{BASE}/{podcast.id}/stream")
assert resp.status_code == 404

View file

@ -31,8 +31,8 @@ def make_spec():
language: str = "en",
style: PodcastStyle = PodcastStyle.CONVERSATIONAL,
speakers: list[SpeakerSpec] | None = None,
min_minutes: int = 10,
max_minutes: int = 20,
min_seconds: int = 600,
max_seconds: int = 1200,
focus: str | None = None,
) -> PodcastSpec:
if speakers is None:
@ -54,7 +54,7 @@ def make_spec():
language=language,
style=style,
speakers=speakers,
duration=DurationTarget(min_minutes=min_minutes, max_minutes=max_minutes),
duration=DurationTarget(min_seconds=min_seconds, max_seconds=max_seconds),
focus=focus,
)

View file

@ -66,7 +66,7 @@ def _spec(voice_id: str) -> PodcastSpec:
speakers=[
SpeakerSpec(slot=0, name="Host", role=SpeakerRole.HOST, voice_id=voice_id)
],
duration=DurationTarget(min_minutes=5, max_minutes=10),
duration=DurationTarget(min_seconds=300, max_seconds=600),
)

View file

@ -57,7 +57,7 @@ def test_spec_normalizes_its_language_on_construction():
spec = PodcastSpec(
language="EN-us",
speakers=[_speaker(0)],
duration=DurationTarget(min_minutes=5, max_minutes=10),
duration=DurationTarget(min_seconds=300, max_seconds=600),
)
assert spec.language == "en-us"
@ -68,7 +68,7 @@ def test_speakers_must_have_unique_slots():
PodcastSpec(
language="en",
speakers=[_speaker(0), _speaker(0, voice_id="kokoro:af_bella")],
duration=DurationTarget(min_minutes=5, max_minutes=10),
duration=DurationTarget(min_seconds=300, max_seconds=600),
)
@ -77,7 +77,7 @@ def test_a_brief_needs_at_least_one_speaker():
PodcastSpec(
language="en",
speakers=[],
duration=DurationTarget(min_minutes=5, max_minutes=10),
duration=DurationTarget(min_seconds=300, max_seconds=600),
)
@ -86,7 +86,7 @@ def test_a_monologue_brief_carries_exactly_one_speaker():
language="en",
style=PodcastStyle.MONOLOGUE,
speakers=[_speaker(0)],
duration=DurationTarget(min_minutes=5, max_minutes=10),
duration=DurationTarget(min_seconds=300, max_seconds=600),
)
assert spec.style is PodcastStyle.MONOLOGUE
@ -98,18 +98,25 @@ def test_a_monologue_brief_rejects_multiple_speakers():
language="en",
style=PodcastStyle.MONOLOGUE,
speakers=[_speaker(0), _speaker(1, voice_id="kokoro:af_bella")],
duration=DurationTarget(min_minutes=5, max_minutes=10),
duration=DurationTarget(min_seconds=300, max_seconds=600),
)
def test_duration_rejects_an_inverted_range():
"""A max below the min is a user error caught at the brief gate."""
with pytest.raises(ValidationError):
DurationTarget(min_minutes=20, max_minutes=10)
DurationTarget(min_seconds=1200, max_seconds=600)
def test_duration_midpoint_is_where_drafting_aims():
assert DurationTarget(min_minutes=10, max_minutes=20).midpoint_minutes == 15
assert DurationTarget(min_seconds=600, max_seconds=1200).midpoint_seconds == 900
assert DurationTarget(min_seconds=600, max_seconds=1200).midpoint_minutes == 15
def test_duration_loads_legacy_minute_fields_from_json():
duration = DurationTarget.model_validate({"min_minutes": 10, "max_minutes": 20})
assert duration.min_seconds == 600
assert duration.max_seconds == 1200
def test_blank_focus_becomes_absent():
@ -117,7 +124,7 @@ def test_blank_focus_becomes_absent():
spec = PodcastSpec(
language="en",
speakers=[_speaker(0)],
duration=DurationTarget(min_minutes=5, max_minutes=10),
duration=DurationTarget(min_seconds=300, max_seconds=600),
focus=" ",
)
assert spec.focus is None
@ -127,7 +134,7 @@ def test_speaker_for_returns_the_speaker_bound_to_a_slot():
spec = PodcastSpec(
language="en",
speakers=[_speaker(0), _speaker(1, voice_id="kokoro:af_bella")],
duration=DurationTarget(min_minutes=5, max_minutes=10),
duration=DurationTarget(min_seconds=300, max_seconds=600),
)
assert spec.speaker_for(1).voice_id == "kokoro:af_bella"
@ -136,7 +143,7 @@ def test_speaker_for_raises_when_no_speaker_matches():
spec = PodcastSpec(
language="en",
speakers=[_speaker(0)],
duration=DurationTarget(min_minutes=5, max_minutes=10),
duration=DurationTarget(min_seconds=300, max_seconds=600),
)
with pytest.raises(KeyError):
spec.speaker_for(99)

View file

@ -105,7 +105,7 @@ async def test_ainvoke_propagates_quota_insufficient_error(monkeypatch):
async def _denying_billable_call(**_kwargs):
raise QuotaInsufficientError(
usage_type="vision_extraction",
balance_micros=0,
balance_micros=5_000_000,
remaining_micros=0,
)
yield # unreachable but required for asynccontextmanager type

View file

@ -98,7 +98,7 @@ async def _denying_billable_call(**kwargs):
_CALL_LOG.append(kwargs)
raise QuotaInsufficientError(
usage_type=kwargs.get("usage_type", "?"),
balance_micros=0,
balance_micros=5_000_000,
remaining_micros=0,
)
yield SimpleNamespace() # pragma: no cover

View file

@ -24,8 +24,10 @@ import {
} from "@/components/ui/select";
import { Textarea } from "@/components/ui/textarea";
import {
MAX_DURATION_SECONDS,
type LanguageOptions,
MAX_SPEAKERS,
MIN_DURATION_SECONDS,
type PodcastSpec,
type PodcastStyle,
podcastStyle,
@ -65,6 +67,9 @@ interface BriefReviewProps {
*/
export function BriefReview({ podcast, spec }: BriefReviewProps) {
const [draft, setDraft] = useState<PodcastSpec>(spec);
const [durationUnit, setDurationUnit] = useState<DurationUnit>(() =>
defaultDurationUnit(spec.duration.max_seconds),
);
const [voices, setVoices] = useState<VoiceOption[] | null>(null);
const [offering, setOffering] = useState<LanguageOptions | null>(null);
const [isSubmitting, setIsSubmitting] = useState(false);
@ -74,6 +79,7 @@ export function BriefReview({ podcast, spec }: BriefReviewProps) {
// biome-ignore lint/correctness/useExhaustiveDependencies: reset only when the server version moves
useEffect(() => {
setDraft(spec);
setDurationUnit(defaultDurationUnit(spec.duration.max_seconds));
}, [podcast.specVersion]);
useEffect(() => {
@ -326,39 +332,72 @@ export function BriefReview({ podcast, spec }: BriefReviewProps) {
))}
</div>
<div className="grid grid-cols-2 gap-4">
<div className="flex flex-col gap-2">
<Label htmlFor="podcast-min-minutes">Min length (minutes)</Label>
<Input
id="podcast-min-minutes"
type="number"
min={1}
value={draft.duration.min_minutes}
onChange={(e) =>
setDraft((current) => ({
...current,
duration: { ...current.duration, min_minutes: Number(e.target.value) || 1 },
}))
}
/>
<div className="flex flex-col gap-2">
<div className="flex items-center justify-between gap-3">
<Label>Target length</Label>
<Select
value={durationUnit}
onValueChange={(value) => setDurationUnit(value as DurationUnit)}
>
<SelectTrigger className="w-[7.5rem]" aria-label="Length unit">
<SelectValue />
</SelectTrigger>
<SelectContent>
<SelectItem value="seconds">Seconds</SelectItem>
<SelectItem value="minutes">Minutes</SelectItem>
<SelectItem value="hours">Hours</SelectItem>
</SelectContent>
</Select>
</div>
<div className="flex flex-col gap-2">
<Label htmlFor="podcast-max-minutes">Max length (minutes)</Label>
<Input
id="podcast-max-minutes"
type="number"
min={draft.duration.min_minutes}
value={draft.duration.max_minutes}
onChange={(e) =>
setDraft((current) => ({
...current,
duration: {
...current.duration,
max_minutes: Number(e.target.value) || current.duration.min_minutes,
},
}))
}
/>
<div className="grid grid-cols-2 gap-4">
<div className="flex flex-col gap-2">
<Label htmlFor="podcast-min-length">Min</Label>
<Input
id="podcast-min-length"
type="number"
min={durationUnitBounds(durationUnit).min}
max={durationUnitBounds(durationUnit).max}
step={durationInputStep(durationUnit)}
value={formatDurationForUnit(draft.duration.min_seconds, durationUnit)}
onChange={(e) => {
const seconds = clampDurationSeconds(
fromUnitValue(Number(e.target.value), durationUnit),
);
setDraft((current) => ({
...current,
duration: { ...current.duration, min_seconds: seconds },
}));
}}
/>
</div>
<div className="flex flex-col gap-2">
<Label htmlFor="podcast-max-length">Max</Label>
<Input
id="podcast-max-length"
type="number"
min={secondsToUnitValue(draft.duration.min_seconds, durationUnit)}
max={durationUnitBounds(durationUnit).max}
step={durationInputStep(durationUnit)}
value={formatDurationForUnit(draft.duration.max_seconds, durationUnit)}
onChange={(e) => {
const parsed = Number(e.target.value);
const fallback = secondsToUnitValue(
draft.duration.min_seconds,
durationUnit,
);
const seconds = clampDurationSeconds(
fromUnitValue(
Number.isFinite(parsed) ? parsed : fallback,
durationUnit,
),
);
setDraft((current) => ({
...current,
duration: { ...current.duration, max_seconds: seconds },
}));
}}
/>
</div>
</div>
</div>
@ -387,7 +426,9 @@ export function BriefReview({ podcast, spec }: BriefReviewProps) {
<Button
type="button"
onClick={handleApprove}
disabled={isSubmitting || draft.duration.max_minutes < draft.duration.min_minutes}
disabled={
isSubmitting || draft.duration.max_seconds < draft.duration.min_seconds
}
>
{isSubmitting ? <Loader2 className="size-4 animate-spin" /> : null}
{isDirty ? "Approve changes & draft transcript" : "Approve & draft transcript"}
@ -473,6 +514,50 @@ function LanguageCombobox({
/** The current selection stays listed even when it no longer matches the
* language filter, so the Select never renders an orphaned value. */
type DurationUnit = "seconds" | "minutes" | "hours";
function defaultDurationUnit(maxSeconds: number): DurationUnit {
if (maxSeconds >= 3600) return "hours";
if (maxSeconds >= 60) return "minutes";
return "seconds";
}
function secondsToUnitValue(seconds: number, unit: DurationUnit): number {
if (unit === "minutes") return seconds / 60;
if (unit === "hours") return seconds / 3600;
return seconds;
}
function fromUnitValue(value: number, unit: DurationUnit): number {
if (!Number.isFinite(value)) return MIN_DURATION_SECONDS;
if (unit === "minutes") return value * 60;
if (unit === "hours") return value * 3600;
return value;
}
function formatDurationForUnit(seconds: number, unit: DurationUnit): number {
const raw = secondsToUnitValue(seconds, unit);
if (unit === "seconds") return Math.round(raw);
return Math.round(raw * 100) / 100;
}
function durationInputStep(unit: DurationUnit): number {
if (unit === "hours") return 0.1;
return 1;
}
function durationUnitBounds(unit: DurationUnit): { min: number; max: number } {
return {
min: formatDurationForUnit(MIN_DURATION_SECONDS, unit),
max: formatDurationForUnit(MAX_DURATION_SECONDS, unit),
};
}
function clampDurationSeconds(value: number): number {
if (!Number.isFinite(value)) return MIN_DURATION_SECONDS;
return Math.min(MAX_DURATION_SECONDS, Math.max(MIN_DURATION_SECONDS, Math.round(value)));
}
function voiceItems(candidates: VoiceOption[], selectedId: string): VoiceOption[] {
if (candidates.some((voice) => voice.voice_id === selectedId)) return candidates;
return [

View file

@ -47,6 +47,11 @@ export type PodcastStyle = z.infer<typeof podcastStyle>;
export const MAX_SPEAKERS = 6;
export const MAX_DURATION_SECONDS = 24 * 60 * 60;
export const MIN_DURATION_SECONDS = 15;
export const DEFAULT_MIN_SECONDS = 20;
export const DEFAULT_MAX_SECONDS = 30;
export const speakerSpec = z.object({
slot: z.number().int().min(0),
name: z.string().min(1).max(120),
@ -55,10 +60,40 @@ export const speakerSpec = z.object({
});
export type SpeakerSpec = z.infer<typeof speakerSpec>;
export const durationTarget = z.object({
min_minutes: z.number().int().min(1),
max_minutes: z.number().int().min(1),
});
export const durationTarget = z.preprocess(
(raw) => {
if (
raw &&
typeof raw === "object" &&
"min_minutes" in raw &&
!("min_seconds" in raw)
) {
const legacy = raw as { min_minutes: number; max_minutes: number };
return {
min_seconds: legacy.min_minutes * 60,
max_seconds: legacy.max_minutes * 60,
};
}
return raw;
},
z
.object({
min_seconds: z
.number()
.int()
.min(MIN_DURATION_SECONDS)
.max(MAX_DURATION_SECONDS),
max_seconds: z
.number()
.int()
.min(MIN_DURATION_SECONDS)
.max(MAX_DURATION_SECONDS),
})
.refine((duration) => duration.max_seconds >= duration.min_seconds, {
message: "Max length must be at least min length",
path: ["max_seconds"],
}),
);
export type DurationTarget = z.infer<typeof durationTarget>;
export const podcastSpec = z