diff --git a/docker/docker-compose.dev.yml b/docker/docker-compose.dev.yml index 2435fcb0f..5b86ea888 100644 --- a/docker/docker-compose.dev.yml +++ b/docker/docker-compose.dev.yml @@ -106,6 +106,7 @@ services: volumes: - ../surfsense_backend/app:/app/app - shared_temp:/shared_tmp + - object_store:/app/.local_object_store env_file: - ../surfsense_backend/.env extra_hosts: @@ -119,6 +120,7 @@ services: - PYTHONPATH=/app - UVICORN_LOOP=asyncio - UNSTRUCTURED_HAS_PATCHED_LOOP=1 + - FILE_STORAGE_LOCAL_PATH=/app/.local_object_store - LANGCHAIN_TRACING_V2=false - LANGSMITH_TRACING=false - AUTH_TYPE=${AUTH_TYPE:-LOCAL} @@ -171,6 +173,7 @@ services: volumes: - ../surfsense_backend/app:/app/app - shared_temp:/shared_tmp + - object_store:/app/.local_object_store env_file: - ../surfsense_backend/.env extra_hosts: @@ -182,6 +185,7 @@ services: - REDIS_APP_URL=${REDIS_URL:-redis://redis:6379/0} - CELERY_TASK_DEFAULT_QUEUE=surfsense - PYTHONPATH=/app + - FILE_STORAGE_LOCAL_PATH=/app/.local_object_store - SEARXNG_DEFAULT_HOST=${SEARXNG_DEFAULT_HOST:-http://searxng:8080} - SERVICE_ROLE=worker depends_on: @@ -277,6 +281,8 @@ volumes: name: surfsense-dev-redis shared_temp: name: surfsense-dev-shared-temp + object_store: + name: surfsense-dev-object-store zero_cache_data: name: surfsense-dev-zero-cache whatsapp_sessions: diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 3bb614c0c..1ee7ae0ed 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -129,6 +129,7 @@ services: - "8000" volumes: - shared_temp:/shared_tmp + - object_store:/app/.local_object_store env_file: - .env extra_hosts: @@ -142,6 +143,7 @@ services: PYTHONPATH: /app UVICORN_LOOP: asyncio UNSTRUCTURED_HAS_PATCHED_LOOP: "1" + FILE_STORAGE_LOCAL_PATH: /app/.local_object_store NEXT_FRONTEND_URL: ${NEXT_FRONTEND_URL:-${SURFSENSE_PUBLIC_URL:-http://localhost:${LISTEN_HTTP_PORT:-3929}}} BACKEND_URL: ${BACKEND_URL:-${SURFSENSE_PUBLIC_URL:-http://localhost:${LISTEN_HTTP_PORT:-3929}}} SEARXNG_DEFAULT_HOST: ${SEARXNG_DEFAULT_HOST:-http://searxng:8080} @@ -195,6 +197,7 @@ services: image: ghcr.io/modsetter/surfsense-backend:${SURFSENSE_VERSION:-latest}${SURFSENSE_VARIANT:+-${SURFSENSE_VARIANT}} volumes: - shared_temp:/shared_tmp + - object_store:/app/.local_object_store env_file: - .env extra_hosts: @@ -206,6 +209,7 @@ services: REDIS_APP_URL: ${REDIS_URL:-redis://redis:6379/0} CELERY_TASK_DEFAULT_QUEUE: surfsense PYTHONPATH: /app + FILE_STORAGE_LOCAL_PATH: /app/.local_object_store SEARXNG_DEFAULT_HOST: ${SEARXNG_DEFAULT_HOST:-http://searxng:8080} SERVICE_ROLE: worker depends_on: @@ -305,6 +309,8 @@ volumes: name: surfsense-redis shared_temp: name: surfsense-shared-temp + object_store: + name: surfsense-object-store zero_cache_data: name: surfsense-zero-cache caddy_data: diff --git a/surfsense_backend/.env.example b/surfsense_backend/.env.example index 33aa09e83..a6b2b30a3 100644 --- a/surfsense_backend/.env.example +++ b/surfsense_backend/.env.example @@ -1,5 +1,20 @@ DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/surfsense +# --- Database startup / safety knobs (optional) --- +# Run extension/table/index DDL on app startup. Set FALSE when schema is owned +# exclusively by Alembic migrations. +# DB_BOOTSTRAP_ON_STARTUP=TRUE +# lock_timeout (ms) for boot-time DDL so a contended CREATE INDEX/TABLE fails +# fast instead of hanging the FastAPI lifespan behind another transaction. +# DB_DDL_LOCK_TIMEOUT_MS=5000 +# idle_in_transaction_session_timeout (ms) so an abandoned "idle in transaction" +# session can't wedge the DB indefinitely. 0 disables. (asyncpg only) +# DB_IDLE_IN_TX_TIMEOUT_MS=900000 +# Same, for the Celery worker engine (long ingestion/podcast/video tasks). If a +# task hasn't touched the DB in this window it's treated as orphaned and dropped. +# 0 disables. (asyncpg only) +# DB_CELERY_IDLE_IN_TX_TIMEOUT_MS=3600000 + # Deployment environment: dev or production SURFSENSE_ENV=dev diff --git a/surfsense_backend/app/config/__init__.py b/surfsense_backend/app/config/__init__.py index 428a37377..97bbda4ef 100644 --- a/surfsense_backend/app/config/__init__.py +++ b/surfsense_backend/app/config/__init__.py @@ -486,6 +486,28 @@ class Config: # Database DATABASE_URL = os.getenv("DATABASE_URL") + # When TRUE (default) the app ensures extensions/tables/indexes exist on + # startup. Set FALSE in environments where schema is owned exclusively by + # Alembic migrations to skip all boot-time DDL. + DB_BOOTSTRAP_ON_STARTUP = ( + os.getenv("DB_BOOTSTRAP_ON_STARTUP", "TRUE").upper() == "TRUE" + ) + # Per-session lock_timeout (ms) applied to boot-time DDL so a contended + # CREATE INDEX / CREATE TABLE fails fast instead of hanging the FastAPI + # lifespan forever behind another transaction's lock. + DB_DDL_LOCK_TIMEOUT_MS = int(os.getenv("DB_DDL_LOCK_TIMEOUT_MS", "5000")) + # Global idle_in_transaction_session_timeout (ms) applied to every pooled + # connection so an abandoned "idle in transaction" session can't wedge the + # database indefinitely. 0 disables. Only applied to asyncpg connections. + DB_IDLE_IN_TX_TIMEOUT_MS = int(os.getenv("DB_IDLE_IN_TX_TIMEOUT_MS", "900000")) + # Same protection for the separate Celery worker engine, where long-running + # ingestion/podcast/video tasks live. Kept higher than the web default so a + # legitimate per-document embed window is never reaped: if a task hasn't + # touched the DB in 60 min it's treated as orphaned and dropped. 0 disables. + DB_CELERY_IDLE_IN_TX_TIMEOUT_MS = int( + os.getenv("DB_CELERY_IDLE_IN_TX_TIMEOUT_MS", "3600000") + ) + # Celery / Redis # Redis (single endpoint for Celery broker, result backend, and app cache). # Legacy CELERY_BROKER_URL / CELERY_RESULT_BACKEND / REDIS_APP_URL still diff --git a/surfsense_backend/app/db.py b/surfsense_backend/app/db.py index 963fee7cd..497af06ac 100644 --- a/surfsense_backend/app/db.py +++ b/surfsense_backend/app/db.py @@ -1,3 +1,4 @@ +import logging import uuid from collections.abc import AsyncGenerator from contextlib import asynccontextmanager @@ -34,6 +35,8 @@ from app.config import config if config.AUTH_TYPE == "GOOGLE": from fastapi_users.db import SQLAlchemyBaseOAuthAccountTableUUID +logger = logging.getLogger(__name__) + DATABASE_URL = config.DATABASE_URL @@ -2726,6 +2729,28 @@ from app.podcasts.persistence import ( # noqa: E402, F401 PodcastStatus, ) + +def _build_connect_args() -> dict: + """Build driver connect_args, including a protective idle-in-transaction + timeout for asyncpg connections. + + A single abandoned ``idle in transaction`` session can hold table/row locks + indefinitely and wedge writes plus boot-time DDL (the classic "FastAPI + stuck at application startup" failure). Setting + ``idle_in_transaction_session_timeout`` server-side makes Postgres reap such + sessions automatically. It never affects sessions that are actively running + statements — only ones that opened a transaction and went idle. + """ + connect_args: dict = {} + idle_ms = config.DB_IDLE_IN_TX_TIMEOUT_MS + # ``server_settings`` is asyncpg-specific; only apply it for that driver. + if idle_ms and idle_ms > 0 and DATABASE_URL and "asyncpg" in DATABASE_URL: + connect_args["server_settings"] = { + "idle_in_transaction_session_timeout": str(idle_ms) + } + return connect_args + + engine = create_async_engine( DATABASE_URL, pool_size=30, @@ -2733,6 +2758,7 @@ engine = create_async_engine( pool_recycle=1800, pool_pre_ping=True, pool_timeout=30, + connect_args=_build_connect_args(), ) async_session_maker = async_sessionmaker(engine, expire_on_commit=False) @@ -2757,54 +2783,117 @@ async def shielded_async_session(): await session.close() -async def setup_indexes(): - async with engine.begin() as conn: - # Create indexes - # Document embedding indexes - await conn.execute( - text( - "CREATE INDEX IF NOT EXISTS document_vector_index ON documents USING hnsw (embedding public.vector_cosine_ops)" - ) - ) - await conn.execute( - text( - "CREATE INDEX IF NOT EXISTS document_search_index ON documents USING gin (to_tsvector('english', content))" - ) - ) - # Document Chuck Indexes - await conn.execute( - text( - "CREATE INDEX IF NOT EXISTS chucks_vector_index ON chunks USING hnsw (embedding public.vector_cosine_ops)" - ) - ) - await conn.execute( - text( - "CREATE INDEX IF NOT EXISTS chucks_search_index ON chunks USING gin (to_tsvector('english', content))" - ) - ) - # pg_trgm indexes for efficient ILIKE '%term%' searches on titles - # Critical for document mention picker (@mentions) to scale - await conn.execute( - text( - "CREATE INDEX IF NOT EXISTS idx_documents_title_trgm ON documents USING gin (title gin_trgm_ops)" - ) - ) - # B-tree index on search_space_id for fast filtering - await conn.execute( - text( - "CREATE INDEX IF NOT EXISTS idx_documents_search_space_id ON documents (search_space_id)" - ) - ) - # Covering index for "recent documents" query - enables index-only scan - await conn.execute( - text( - "CREATE INDEX IF NOT EXISTS idx_documents_search_space_updated ON documents (search_space_id, updated_at DESC NULLS LAST) INCLUDE (id, title, document_type)" - ) +# (index_name, table, CREATE statement). Built with CONCURRENTLY so an index +# build only takes a non-blocking ShareUpdateExclusiveLock — ingestion +# INSERT/UPDATE on documents/chunks keep flowing while the index builds, and a +# slow build can never freeze the FastAPI lifespan or block writers. +_INDEX_DEFINITIONS: list[tuple[str, str, str]] = [ + ( + "document_vector_index", + "documents", + "CREATE INDEX CONCURRENTLY IF NOT EXISTS document_vector_index ON documents USING hnsw (embedding public.vector_cosine_ops)", + ), + ( + "document_search_index", + "documents", + "CREATE INDEX CONCURRENTLY IF NOT EXISTS document_search_index ON documents USING gin (to_tsvector('english', content))", + ), + ( + "chucks_vector_index", + "chunks", + "CREATE INDEX CONCURRENTLY IF NOT EXISTS chucks_vector_index ON chunks USING hnsw (embedding public.vector_cosine_ops)", + ), + ( + "chucks_search_index", + "chunks", + "CREATE INDEX CONCURRENTLY IF NOT EXISTS chucks_search_index ON chunks USING gin (to_tsvector('english', content))", + ), + # pg_trgm index for efficient ILIKE '%term%' searches on titles — critical + # for the document mention picker (@mentions) to scale. + ( + "idx_documents_title_trgm", + "documents", + "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_documents_title_trgm ON documents USING gin (title gin_trgm_ops)", + ), + ( + "idx_documents_search_space_id", + "documents", + "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_documents_search_space_id ON documents (search_space_id)", + ), + # Covering index for "recent documents" query — enables index-only scan. + ( + "idx_documents_search_space_updated", + "documents", + "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_documents_search_space_updated ON documents (search_space_id, updated_at DESC NULLS LAST) INCLUDE (id, title, document_type)", + ), +] + + +async def _drop_invalid_index(conn, name: str) -> None: + """Drop a leftover *invalid* index so it can be rebuilt. + + A ``CREATE INDEX CONCURRENTLY`` that is interrupted (timeout, crash, + cancellation) leaves behind an ``indisvalid = false`` index. Because the + name now exists, a later ``CREATE INDEX CONCURRENTLY IF NOT EXISTS`` would + skip it and the broken index would persist forever. Detect and drop it + first. + """ + result = await conn.execute( + text("SELECT indisvalid FROM pg_index WHERE indexrelid = to_regclass(:n)"), + {"n": name}, + ) + row = result.first() + if row is not None and row[0] is False: + logger.warning( + "[startup] dropping invalid leftover index %s before rebuild", name ) + await conn.execute(text(f'DROP INDEX CONCURRENTLY IF EXISTS "{name}"')) + + +async def setup_indexes() -> None: + """Ensure search/vector indexes exist without ever blocking startup. + + Each index is created with ``CONCURRENTLY`` (so it never takes a blocking + SHARE lock on documents/chunks) under a short per-session ``lock_timeout`` + (so a contended boot fails fast instead of hanging the lifespan forever). + Failures are logged and swallowed per-index — a missing index just gets + retried on the next boot rather than crash-looping the API. + """ + lock_timeout_ms = int(config.DB_DDL_LOCK_TIMEOUT_MS) + # AUTOCOMMIT is mandatory: CREATE INDEX CONCURRENTLY cannot run inside a + # transaction block. + async with engine.connect() as base_conn: + conn = await base_conn.execution_options(isolation_level="AUTOCOMMIT") + await conn.execute(text(f"SET lock_timeout = {lock_timeout_ms}")) + for name, table, ddl in _INDEX_DEFINITIONS: + try: + await _drop_invalid_index(conn, name) + await conn.execute(text(ddl)) + except Exception as exc: + # Non-fatal by design: a missing index is retried next boot. + logger.warning( + "[startup] index %s on %s not ready (%s: %s); " + "will retry on next boot", + name, + table, + exc.__class__.__name__, + exc, + ) async def create_db_and_tables(): + if not config.DB_BOOTSTRAP_ON_STARTUP: + logger.info( + "[startup] DB bootstrap skipped (DB_BOOTSTRAP_ON_STARTUP=FALSE); " + "schema/indexes are expected to be managed by migrations" + ) + return + + lock_timeout_ms = int(config.DB_DDL_LOCK_TIMEOUT_MS) async with engine.begin() as conn: + # Fail fast instead of hanging forever if another session holds a + # conflicting lock on a table we need to touch. + await conn.execute(text(f"SET LOCAL lock_timeout = {lock_timeout_ms}")) await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) await conn.run_sync(Base.metadata.create_all) diff --git a/surfsense_backend/app/podcasts/api/routes.py b/surfsense_backend/app/podcasts/api/routes.py index 43a99f16e..cfcb2ede9 100644 --- a/surfsense_backend/app/podcasts/api/routes.py +++ b/surfsense_backend/app/podcasts/api/routes.py @@ -27,14 +27,14 @@ from app.db import ( get_async_session, ) from app.podcasts.generation.brief import propose_brief -from app.podcasts.persistence import Podcast, PodcastRepository +from app.podcasts.persistence import Podcast, PodcastRepository, PodcastStatus from app.podcasts.service import ( InvalidTransitionError, PodcastService, PreconditionFailedError, SpecConflictError, ) -from app.podcasts.storage import open_audio_stream, purge_audio +from app.podcasts.storage import audio_exists, open_audio_stream, purge_audio from app.podcasts.tasks import draft_transcript_task from app.podcasts.tts import get_text_to_speech from app.podcasts.voices import ( @@ -172,8 +172,8 @@ async def create_podcast( session, search_space_id=body.search_space_id, speaker_count=body.speaker_count, - min_minutes=body.min_minutes, - max_minutes=body.max_minutes, + min_seconds=body.min_seconds, + max_seconds=body.max_seconds, focus=body.focus, ) await service.attach_brief(podcast, spec) @@ -287,6 +287,11 @@ async def stream_podcast( podcast = await _load(session, user, podcast_id, Permission.PODCASTS_READ) if podcast.storage_key: + # Verify first so a missing object is a 404, not a mid-stream crash. + if not await audio_exists(podcast): + raise HTTPException( + status_code=404, detail="Podcast audio is no longer available" + ) return StreamingResponse( open_audio_stream(podcast), media_type="audio/mpeg", @@ -310,7 +315,10 @@ async def stream_podcast( }, ) - raise HTTPException(status_code=404, detail="Podcast audio not found") + # No audio: terminal states never will have any, otherwise it's in flight. + if PodcastStatus(podcast.status).is_terminal: + raise HTTPException(status_code=404, detail="Podcast audio not found") + raise HTTPException(status_code=409, detail="Podcast audio is not ready yet") async def _require( diff --git a/surfsense_backend/app/podcasts/api/schemas.py b/surfsense_backend/app/podcasts/api/schemas.py index c412e372f..cb8559651 100644 --- a/surfsense_backend/app/podcasts/api/schemas.py +++ b/surfsense_backend/app/podcasts/api/schemas.py @@ -11,6 +11,12 @@ from datetime import datetime from pydantic import BaseModel, ConfigDict, Field +from app.podcasts.duration_limits import ( + DEFAULT_MAX_SECONDS, + DEFAULT_MIN_SECONDS, + MAX_DURATION_SECONDS, + MIN_DURATION_SECONDS, +) from app.podcasts.persistence import Podcast, PodcastStatus from app.podcasts.schemas import PodcastSpec, Transcript from app.podcasts.service import has_stored_episode, read_spec, read_transcript @@ -18,8 +24,6 @@ from app.podcasts.service import has_stored_episode, read_spec, read_transcript # Defaults applied when a create request omits brief sizing; the brief gate lets # the user adjust before any cost is incurred. DEFAULT_SPEAKER_COUNT = 2 -DEFAULT_MIN_MINUTES = 10 -DEFAULT_MAX_MINUTES = 20 class CreatePodcastRequest(BaseModel): @@ -30,8 +34,16 @@ class CreatePodcastRequest(BaseModel): source_content: str = Field(..., min_length=1) thread_id: int | None = None speaker_count: int = Field(default=DEFAULT_SPEAKER_COUNT, ge=1, le=6) - min_minutes: int = Field(default=DEFAULT_MIN_MINUTES, ge=1) - max_minutes: int = Field(default=DEFAULT_MAX_MINUTES, ge=1) + min_seconds: int = Field( + default=DEFAULT_MIN_SECONDS, + ge=MIN_DURATION_SECONDS, + le=MAX_DURATION_SECONDS, + ) + max_seconds: int = Field( + default=DEFAULT_MAX_SECONDS, + ge=MIN_DURATION_SECONDS, + le=MAX_DURATION_SECONDS, + ) focus: str | None = Field(default=None, max_length=2000) diff --git a/surfsense_backend/app/podcasts/duration_limits.py b/surfsense_backend/app/podcasts/duration_limits.py new file mode 100644 index 000000000..fc7d29890 --- /dev/null +++ b/surfsense_backend/app/podcasts/duration_limits.py @@ -0,0 +1,6 @@ +"""Shared bounds and defaults for podcast target duration.""" + +MAX_DURATION_SECONDS = 24 * 60 * 60 +MIN_DURATION_SECONDS = 15 +DEFAULT_MIN_SECONDS = 20 +DEFAULT_MAX_SECONDS = 30 diff --git a/surfsense_backend/app/podcasts/generation/brief/config.py b/surfsense_backend/app/podcasts/generation/brief/config.py index 4f92585ae..9b206bde4 100644 --- a/surfsense_backend/app/podcasts/generation/brief/config.py +++ b/surfsense_backend/app/podcasts/generation/brief/config.py @@ -6,10 +6,13 @@ from dataclasses import dataclass, field, fields from langchain_core.runnables import RunnableConfig +from app.podcasts.duration_limits import ( + DEFAULT_MAX_SECONDS, + DEFAULT_MIN_SECONDS, +) + # Sensible defaults for a fresh brief; the user adjusts the range at the gate. DEFAULT_SPEAKER_COUNT = 2 -DEFAULT_MIN_MINUTES = 10 -DEFAULT_MAX_MINUTES = 20 @dataclass(kw_only=True) @@ -17,8 +20,8 @@ class BriefConfig: """Signals used to propose a brief; everything here is non-LLM context.""" speaker_count: int = DEFAULT_SPEAKER_COUNT - min_minutes: int = DEFAULT_MIN_MINUTES - max_minutes: int = DEFAULT_MAX_MINUTES + min_seconds: int = DEFAULT_MIN_SECONDS + max_seconds: int = DEFAULT_MAX_SECONDS focus: str | None = None last_used_language: str | None = None last_used_voices: list[str] = field(default_factory=list) diff --git a/surfsense_backend/app/podcasts/generation/brief/nodes.py b/surfsense_backend/app/podcasts/generation/brief/nodes.py index c0a6f1ae1..de6a9717e 100644 --- a/surfsense_backend/app/podcasts/generation/brief/nodes.py +++ b/surfsense_backend/app/podcasts/generation/brief/nodes.py @@ -79,7 +79,7 @@ def propose_spec(state: BriefState, config: RunnableConfig) -> dict[str, Any]: style=PodcastStyle.CONVERSATIONAL, speakers=speakers, duration=DurationTarget( - min_minutes=brief.min_minutes, max_minutes=brief.max_minutes + min_seconds=brief.min_seconds, max_seconds=brief.max_seconds ), focus=brief.focus, ) diff --git a/surfsense_backend/app/podcasts/generation/brief/propose.py b/surfsense_backend/app/podcasts/generation/brief/propose.py index 17344702b..09d74840e 100644 --- a/surfsense_backend/app/podcasts/generation/brief/propose.py +++ b/surfsense_backend/app/podcasts/generation/brief/propose.py @@ -4,11 +4,12 @@ from __future__ import annotations from sqlalchemy.ext.asyncio import AsyncSession +from app.podcasts.duration_limits import DEFAULT_MAX_SECONDS, DEFAULT_MIN_SECONDS from app.podcasts.persistence import PodcastRepository from app.podcasts.schemas import PodcastSpec from app.podcasts.service import preferences_from -from .config import DEFAULT_MAX_MINUTES, DEFAULT_MIN_MINUTES, DEFAULT_SPEAKER_COUNT +from .config import DEFAULT_SPEAKER_COUNT from .graph import graph as brief_graph from .state import BriefState @@ -18,8 +19,8 @@ async def propose_brief( *, search_space_id: int, speaker_count: int = DEFAULT_SPEAKER_COUNT, - min_minutes: int = DEFAULT_MIN_MINUTES, - max_minutes: int = DEFAULT_MAX_MINUTES, + min_seconds: int = DEFAULT_MIN_SECONDS, + max_seconds: int = DEFAULT_MAX_SECONDS, focus: str | None = None, ) -> PodcastSpec: """Reuse the last-used language and voices, else English; return the spec.""" @@ -29,8 +30,8 @@ async def propose_brief( config = { "configurable": { "speaker_count": speaker_count, - "min_minutes": min_minutes, - "max_minutes": max_minutes, + "min_seconds": min_seconds, + "max_seconds": max_seconds, "focus": focus, "last_used_language": last_language, "last_used_voices": last_voices, diff --git a/surfsense_backend/app/podcasts/generation/transcript/nodes.py b/surfsense_backend/app/podcasts/generation/transcript/nodes.py index 44d6b219d..7b472348d 100644 --- a/surfsense_backend/app/podcasts/generation/transcript/nodes.py +++ b/surfsense_backend/app/podcasts/generation/transcript/nodes.py @@ -38,7 +38,7 @@ async def plan_outline( tc = TranscriptConfig.from_runnable_config(config) llm = await _require_llm(state, tc) - target_words = round(tc.spec.duration.midpoint_minutes * _WORDS_PER_MINUTE) + target_words = round(tc.spec.duration.midpoint_seconds * _WORDS_PER_MINUTE / 60) suggested_segments = max(1, round(target_words / _WORDS_PER_SEGMENT)) messages = [ diff --git a/surfsense_backend/app/podcasts/schemas/spec.py b/surfsense_backend/app/podcasts/schemas/spec.py index 1ef3dcfff..3799d883b 100644 --- a/surfsense_backend/app/podcasts/schemas/spec.py +++ b/surfsense_backend/app/podcasts/schemas/spec.py @@ -10,17 +10,19 @@ from __future__ import annotations import re from enum import StrEnum +from typing import Any from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator +from app.podcasts.duration_limits import ( + MAX_DURATION_SECONDS, + MIN_DURATION_SECONDS, +) + # A speaker count beyond this is almost never a real podcast and explodes the # voice/turn-attribution space, so we reject it at the brief gate. MAX_SPEAKERS = 6 -# Long-form is a goal, but an open-ended upper bound invites runaway TTS bills. -# One day of audio is a generous ceiling that still blocks obvious mistakes. -MAX_DURATION_MINUTES = 24 * 60 - # BCP-47 primary subtag plus optional region (e.g. ``en``, ``en-US``, ``pt-BR``). # Kept deliberately permissive: the voice catalog, not the brief, decides which # languages can actually be synthesised. Casing is normalised after matching. @@ -91,7 +93,7 @@ class SpeakerSpec(BaseModel): class DurationTarget(BaseModel): - """The desired finished length as an inclusive minute range. + """The desired finished length as an inclusive second range. Drafting aims for the midpoint and treats the bounds as soft guardrails; storing a range (rather than a point) keeps long-form expectations honest @@ -100,19 +102,38 @@ class DurationTarget(BaseModel): model_config = ConfigDict(extra="forbid") - min_minutes: int = Field(..., ge=1, le=MAX_DURATION_MINUTES) - max_minutes: int = Field(..., ge=1, le=MAX_DURATION_MINUTES) + min_seconds: int = Field(..., ge=MIN_DURATION_SECONDS, le=MAX_DURATION_SECONDS) + max_seconds: int = Field(..., ge=MIN_DURATION_SECONDS, le=MAX_DURATION_SECONDS) + + @model_validator(mode="before") + @classmethod + def _coerce_legacy_minutes(cls, data: Any) -> Any: + """Rows stored before seconds-based briefs still load from JSONB.""" + if ( + isinstance(data, dict) + and "min_seconds" not in data + and "min_minutes" in data + ): + migrated = dict(data) + migrated["min_seconds"] = int(migrated.pop("min_minutes")) * 60 + migrated["max_seconds"] = int(migrated.pop("max_minutes")) * 60 + return migrated + return data @model_validator(mode="after") def _check_order(self) -> DurationTarget: - if self.max_minutes < self.min_minutes: - raise ValueError("max_minutes must be >= min_minutes") + if self.max_seconds < self.min_seconds: + raise ValueError("max_seconds must be >= min_seconds") return self @property - def midpoint_minutes(self) -> float: + def midpoint_seconds(self) -> float: """The runtime drafting should aim for within the range.""" - return (self.min_minutes + self.max_minutes) / 2 + return (self.min_seconds + self.max_seconds) / 2 + + @property + def midpoint_minutes(self) -> float: + return self.midpoint_seconds / 60 class PodcastSpec(BaseModel): diff --git a/surfsense_backend/app/podcasts/storage.py b/surfsense_backend/app/podcasts/storage.py index f02429dff..c3326460d 100644 --- a/surfsense_backend/app/podcasts/storage.py +++ b/surfsense_backend/app/podcasts/storage.py @@ -42,6 +42,13 @@ def open_audio_stream(podcast: Podcast) -> AsyncIterator[bytes]: return get_storage_backend().open_stream(podcast.storage_key) +async def audio_exists(podcast: Podcast) -> bool: + """Whether the podcast's stored audio object is actually present.""" + return bool(podcast.storage_key) and await get_storage_backend().exists( + podcast.storage_key + ) + + async def purge_audio(podcast: Podcast) -> None: """Delete a podcast's stored audio if present; a missing object is fine.""" await purge_audio_object(podcast.storage_key) diff --git a/surfsense_backend/app/routes/public_chat_routes.py b/surfsense_backend/app/routes/public_chat_routes.py index 53f4c2651..516e976e6 100644 --- a/surfsense_backend/app/routes/public_chat_routes.py +++ b/surfsense_backend/app/routes/public_chat_routes.py @@ -103,8 +103,14 @@ async def stream_public_podcast( if storage_key: from app.file_storage.factory import get_storage_backend + backend = get_storage_backend() + # Verify first so a missing object is a 404, not a mid-stream crash. + if not await backend.exists(storage_key): + raise HTTPException( + status_code=404, detail="Podcast audio is no longer available" + ) return StreamingResponse( - get_storage_backend().open_stream(storage_key), + backend.open_stream(storage_key), media_type="audio/mpeg", headers={"Accept-Ranges": "bytes"}, ) diff --git a/surfsense_backend/app/tasks/celery_tasks/__init__.py b/surfsense_backend/app/tasks/celery_tasks/__init__.py index 6ea7a2e68..a1113884f 100644 --- a/surfsense_backend/app/tasks/celery_tasks/__init__.py +++ b/surfsense_backend/app/tasks/celery_tasks/__init__.py @@ -32,10 +32,27 @@ def get_celery_session_maker() -> async_sessionmaker: """ global _celery_engine, _celery_session_maker if _celery_session_maker is None: + # Reap connections orphaned mid-transaction (e.g. a worker that hung or + # crashed mid-index) so they can't hold locks on documents/chunks and + # wedge writes — the failure mode that previously left an "idle in + # transaction" session holding locks for 11+ hours. Kept generous so a + # legitimate long per-document embed window is never killed. + connect_args: dict = {} + idle_ms = config.DB_CELERY_IDLE_IN_TX_TIMEOUT_MS + if ( + idle_ms + and idle_ms > 0 + and config.DATABASE_URL + and "asyncpg" in config.DATABASE_URL + ): + connect_args["server_settings"] = { + "idle_in_transaction_session_timeout": str(idle_ms) + } _celery_engine = create_async_engine( config.DATABASE_URL, poolclass=NullPool, echo=False, + connect_args=connect_args, ) with contextlib.suppress(Exception): from app.observability.bootstrap import instrument_sqlalchemy_engine diff --git a/surfsense_backend/tests/integration/podcasts/conftest.py b/surfsense_backend/tests/integration/podcasts/conftest.py index f244c17d2..75248a6a1 100644 --- a/surfsense_backend/tests/integration/podcasts/conftest.py +++ b/surfsense_backend/tests/integration/podcasts/conftest.py @@ -120,6 +120,9 @@ class FakeStorageBackend: async def open_stream(self, key: str) -> AsyncIterator[bytes]: yield self.objects.get(key, b"audio-bytes") + async def exists(self, key: str) -> bool: + return key in self.objects + async def delete(self, key: str) -> None: self.deleted.append(key) @@ -214,7 +217,7 @@ def build_spec( slot=1, name="Guest", role=SpeakerRole.GUEST, voice_id=voice_ids[1] ), ], - duration=DurationTarget(min_minutes=10, max_minutes=20), + duration=DurationTarget(min_seconds=600, max_seconds=1200), ) diff --git a/surfsense_backend/tests/integration/podcasts/test_draft_task.py b/surfsense_backend/tests/integration/podcasts/test_draft_task.py index e9c9e4a9c..014d98b1f 100644 --- a/surfsense_backend/tests/integration/podcasts/test_draft_task.py +++ b/surfsense_backend/tests/integration/podcasts/test_draft_task.py @@ -76,7 +76,7 @@ async def test_quota_denial_fails_the_podcast_without_a_transcript( async def _deny(**_kwargs): raise QuotaInsufficientError( usage_type="podcast_generation", - balance_micros=0, + balance_micros=5_000_000, remaining_micros=0, ) yield # pragma: no cover - unreachable, satisfies the CM protocol diff --git a/surfsense_backend/tests/integration/podcasts/test_public_stream.py b/surfsense_backend/tests/integration/podcasts/test_public_stream.py index d2ba1d1b9..63f634234 100644 --- a/surfsense_backend/tests/integration/podcasts/test_public_stream.py +++ b/surfsense_backend/tests/integration/podcasts/test_public_stream.py @@ -48,6 +48,22 @@ async def test_public_stream_serves_audio_via_storage_key( assert resp.content == b"public-audio" +async def test_public_stream_404_when_object_missing( + client, db_session, db_search_space, db_user, fake_storage +): + await _snapshot( + db_session, + search_space_id=db_search_space.id, + user=db_user, + token="tok-gone", + podcasts=[{"original_id": 556, "storage_key": "podcasts/gone.mp3"}], + ) + + resp = await client.get("/api/v1/public/tok-gone/podcasts/556/stream") + + assert resp.status_code == 404 + + async def test_public_stream_404_when_podcast_absent_from_snapshot( client, db_session, db_search_space, db_user ): diff --git a/surfsense_backend/tests/integration/podcasts/test_streaming.py b/surfsense_backend/tests/integration/podcasts/test_streaming.py index 82456bac9..b924e2971 100644 --- a/surfsense_backend/tests/integration/podcasts/test_streaming.py +++ b/surfsense_backend/tests/integration/podcasts/test_streaming.py @@ -1,8 +1,7 @@ """Streaming a podcast's rendered audio over HTTP. -A ready podcast streams its bytes from the storage backend; a podcast with no -stored audio returns 404. Storage is an in-memory backend (the object store is a -system boundary). +A ready podcast streams its bytes; an in-flight one is 409, a stored-but-missing +object is 404. Storage is an in-memory backend (the object store is a boundary). """ from __future__ import annotations @@ -31,11 +30,23 @@ async def test_stream_serves_stored_audio( assert resp.content == b"the-audio" -async def test_stream_404_when_no_audio(client, db_search_space, make_podcast): +async def test_stream_409_while_in_flight(client, db_search_space, make_podcast): podcast = await make_podcast( search_space_id=db_search_space.id, status=PodcastStatus.DRAFTING ) resp = await client.get(f"{BASE}/{podcast.id}/stream") + assert resp.status_code == 409 + + +async def test_stream_404_when_object_missing( + client, db_search_space, make_podcast, fake_storage +): + podcast = await make_podcast( + search_space_id=db_search_space.id, status=PodcastStatus.READY + ) + + resp = await client.get(f"{BASE}/{podcast.id}/stream") + assert resp.status_code == 404 diff --git a/surfsense_backend/tests/unit/podcasts/conftest.py b/surfsense_backend/tests/unit/podcasts/conftest.py index 5eb4d8457..c77eb1cc6 100644 --- a/surfsense_backend/tests/unit/podcasts/conftest.py +++ b/surfsense_backend/tests/unit/podcasts/conftest.py @@ -31,8 +31,8 @@ def make_spec(): language: str = "en", style: PodcastStyle = PodcastStyle.CONVERSATIONAL, speakers: list[SpeakerSpec] | None = None, - min_minutes: int = 10, - max_minutes: int = 20, + min_seconds: int = 600, + max_seconds: int = 1200, focus: str | None = None, ) -> PodcastSpec: if speakers is None: @@ -54,7 +54,7 @@ def make_spec(): language=language, style=style, speakers=speakers, - duration=DurationTarget(min_minutes=min_minutes, max_minutes=max_minutes), + duration=DurationTarget(min_seconds=min_seconds, max_seconds=max_seconds), focus=focus, ) diff --git a/surfsense_backend/tests/unit/podcasts/test_renderer.py b/surfsense_backend/tests/unit/podcasts/test_renderer.py index 2bcdff967..bb7b8f181 100644 --- a/surfsense_backend/tests/unit/podcasts/test_renderer.py +++ b/surfsense_backend/tests/unit/podcasts/test_renderer.py @@ -66,7 +66,7 @@ def _spec(voice_id: str) -> PodcastSpec: speakers=[ SpeakerSpec(slot=0, name="Host", role=SpeakerRole.HOST, voice_id=voice_id) ], - duration=DurationTarget(min_minutes=5, max_minutes=10), + duration=DurationTarget(min_seconds=300, max_seconds=600), ) diff --git a/surfsense_backend/tests/unit/podcasts/test_spec.py b/surfsense_backend/tests/unit/podcasts/test_spec.py index 4efd530e9..77e720286 100644 --- a/surfsense_backend/tests/unit/podcasts/test_spec.py +++ b/surfsense_backend/tests/unit/podcasts/test_spec.py @@ -57,7 +57,7 @@ def test_spec_normalizes_its_language_on_construction(): spec = PodcastSpec( language="EN-us", speakers=[_speaker(0)], - duration=DurationTarget(min_minutes=5, max_minutes=10), + duration=DurationTarget(min_seconds=300, max_seconds=600), ) assert spec.language == "en-us" @@ -68,7 +68,7 @@ def test_speakers_must_have_unique_slots(): PodcastSpec( language="en", speakers=[_speaker(0), _speaker(0, voice_id="kokoro:af_bella")], - duration=DurationTarget(min_minutes=5, max_minutes=10), + duration=DurationTarget(min_seconds=300, max_seconds=600), ) @@ -77,7 +77,7 @@ def test_a_brief_needs_at_least_one_speaker(): PodcastSpec( language="en", speakers=[], - duration=DurationTarget(min_minutes=5, max_minutes=10), + duration=DurationTarget(min_seconds=300, max_seconds=600), ) @@ -86,7 +86,7 @@ def test_a_monologue_brief_carries_exactly_one_speaker(): language="en", style=PodcastStyle.MONOLOGUE, speakers=[_speaker(0)], - duration=DurationTarget(min_minutes=5, max_minutes=10), + duration=DurationTarget(min_seconds=300, max_seconds=600), ) assert spec.style is PodcastStyle.MONOLOGUE @@ -98,18 +98,25 @@ def test_a_monologue_brief_rejects_multiple_speakers(): language="en", style=PodcastStyle.MONOLOGUE, speakers=[_speaker(0), _speaker(1, voice_id="kokoro:af_bella")], - duration=DurationTarget(min_minutes=5, max_minutes=10), + duration=DurationTarget(min_seconds=300, max_seconds=600), ) def test_duration_rejects_an_inverted_range(): """A max below the min is a user error caught at the brief gate.""" with pytest.raises(ValidationError): - DurationTarget(min_minutes=20, max_minutes=10) + DurationTarget(min_seconds=1200, max_seconds=600) def test_duration_midpoint_is_where_drafting_aims(): - assert DurationTarget(min_minutes=10, max_minutes=20).midpoint_minutes == 15 + assert DurationTarget(min_seconds=600, max_seconds=1200).midpoint_seconds == 900 + assert DurationTarget(min_seconds=600, max_seconds=1200).midpoint_minutes == 15 + + +def test_duration_loads_legacy_minute_fields_from_json(): + duration = DurationTarget.model_validate({"min_minutes": 10, "max_minutes": 20}) + assert duration.min_seconds == 600 + assert duration.max_seconds == 1200 def test_blank_focus_becomes_absent(): @@ -117,7 +124,7 @@ def test_blank_focus_becomes_absent(): spec = PodcastSpec( language="en", speakers=[_speaker(0)], - duration=DurationTarget(min_minutes=5, max_minutes=10), + duration=DurationTarget(min_seconds=300, max_seconds=600), focus=" ", ) assert spec.focus is None @@ -127,7 +134,7 @@ def test_speaker_for_returns_the_speaker_bound_to_a_slot(): spec = PodcastSpec( language="en", speakers=[_speaker(0), _speaker(1, voice_id="kokoro:af_bella")], - duration=DurationTarget(min_minutes=5, max_minutes=10), + duration=DurationTarget(min_seconds=300, max_seconds=600), ) assert spec.speaker_for(1).voice_id == "kokoro:af_bella" @@ -136,7 +143,7 @@ def test_speaker_for_raises_when_no_speaker_matches(): spec = PodcastSpec( language="en", speakers=[_speaker(0)], - duration=DurationTarget(min_minutes=5, max_minutes=10), + duration=DurationTarget(min_seconds=300, max_seconds=600), ) with pytest.raises(KeyError): spec.speaker_for(99) diff --git a/surfsense_backend/tests/unit/services/test_quota_checked_vision_llm.py b/surfsense_backend/tests/unit/services/test_quota_checked_vision_llm.py index 17df89135..0f5dd531f 100644 --- a/surfsense_backend/tests/unit/services/test_quota_checked_vision_llm.py +++ b/surfsense_backend/tests/unit/services/test_quota_checked_vision_llm.py @@ -105,7 +105,7 @@ async def test_ainvoke_propagates_quota_insufficient_error(monkeypatch): async def _denying_billable_call(**_kwargs): raise QuotaInsufficientError( usage_type="vision_extraction", - balance_micros=0, + balance_micros=5_000_000, remaining_micros=0, ) yield # unreachable but required for asynccontextmanager type diff --git a/surfsense_backend/tests/unit/tasks/test_video_presentation_billing.py b/surfsense_backend/tests/unit/tasks/test_video_presentation_billing.py index 97c1551a5..7183024ed 100644 --- a/surfsense_backend/tests/unit/tasks/test_video_presentation_billing.py +++ b/surfsense_backend/tests/unit/tasks/test_video_presentation_billing.py @@ -98,7 +98,7 @@ async def _denying_billable_call(**kwargs): _CALL_LOG.append(kwargs) raise QuotaInsufficientError( usage_type=kwargs.get("usage_type", "?"), - balance_micros=0, + balance_micros=5_000_000, remaining_micros=0, ) yield SimpleNamespace() # pragma: no cover diff --git a/surfsense_web/components/tool-ui/podcast/brief-review.tsx b/surfsense_web/components/tool-ui/podcast/brief-review.tsx index d662aebc2..98616643a 100644 --- a/surfsense_web/components/tool-ui/podcast/brief-review.tsx +++ b/surfsense_web/components/tool-ui/podcast/brief-review.tsx @@ -24,8 +24,10 @@ import { } from "@/components/ui/select"; import { Textarea } from "@/components/ui/textarea"; import { + MAX_DURATION_SECONDS, type LanguageOptions, MAX_SPEAKERS, + MIN_DURATION_SECONDS, type PodcastSpec, type PodcastStyle, podcastStyle, @@ -65,6 +67,9 @@ interface BriefReviewProps { */ export function BriefReview({ podcast, spec }: BriefReviewProps) { const [draft, setDraft] = useState(spec); + const [durationUnit, setDurationUnit] = useState(() => + defaultDurationUnit(spec.duration.max_seconds), + ); const [voices, setVoices] = useState(null); const [offering, setOffering] = useState(null); const [isSubmitting, setIsSubmitting] = useState(false); @@ -74,6 +79,7 @@ export function BriefReview({ podcast, spec }: BriefReviewProps) { // biome-ignore lint/correctness/useExhaustiveDependencies: reset only when the server version moves useEffect(() => { setDraft(spec); + setDurationUnit(defaultDurationUnit(spec.duration.max_seconds)); }, [podcast.specVersion]); useEffect(() => { @@ -326,39 +332,72 @@ export function BriefReview({ podcast, spec }: BriefReviewProps) { ))} -
-
- - - setDraft((current) => ({ - ...current, - duration: { ...current.duration, min_minutes: Number(e.target.value) || 1 }, - })) - } - /> +
+
+ +
-
- - - setDraft((current) => ({ - ...current, - duration: { - ...current.duration, - max_minutes: Number(e.target.value) || current.duration.min_minutes, - }, - })) - } - /> +
+
+ + { + const seconds = clampDurationSeconds( + fromUnitValue(Number(e.target.value), durationUnit), + ); + setDraft((current) => ({ + ...current, + duration: { ...current.duration, min_seconds: seconds }, + })); + }} + /> +
+
+ + { + const parsed = Number(e.target.value); + const fallback = secondsToUnitValue( + draft.duration.min_seconds, + durationUnit, + ); + const seconds = clampDurationSeconds( + fromUnitValue( + Number.isFinite(parsed) ? parsed : fallback, + durationUnit, + ), + ); + setDraft((current) => ({ + ...current, + duration: { ...current.duration, max_seconds: seconds }, + })); + }} + /> +
@@ -387,7 +426,9 @@ export function BriefReview({ podcast, spec }: BriefReviewProps) {