diff --git a/surfsense_backend/.env.example b/surfsense_backend/.env.example index b4f67328c..7e5c9b6f0 100644 --- a/surfsense_backend/.env.example +++ b/surfsense_backend/.env.example @@ -1,5 +1,20 @@ DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/surfsense +# --- Database startup / safety knobs (optional) --- +# Run extension/table/index DDL on app startup. Set FALSE when schema is owned +# exclusively by Alembic migrations. +# DB_BOOTSTRAP_ON_STARTUP=TRUE +# lock_timeout (ms) for boot-time DDL so a contended CREATE INDEX/TABLE fails +# fast instead of hanging the FastAPI lifespan behind another transaction. +# DB_DDL_LOCK_TIMEOUT_MS=5000 +# idle_in_transaction_session_timeout (ms) so an abandoned "idle in transaction" +# session can't wedge the DB indefinitely. 0 disables. (asyncpg only) +# DB_IDLE_IN_TX_TIMEOUT_MS=900000 +# Same, for the Celery worker engine (long ingestion/podcast/video tasks). If a +# task hasn't touched the DB in this window it's treated as orphaned and dropped. +# 0 disables. (asyncpg only) +# DB_CELERY_IDLE_IN_TX_TIMEOUT_MS=3600000 + # Deployment environment: dev or production SURFSENSE_ENV=dev diff --git a/surfsense_backend/app/config/__init__.py b/surfsense_backend/app/config/__init__.py index bbaf3ac55..69fb023fe 100644 --- a/surfsense_backend/app/config/__init__.py +++ b/surfsense_backend/app/config/__init__.py @@ -541,6 +541,28 @@ class Config: # Database DATABASE_URL = os.getenv("DATABASE_URL") + # When TRUE (default) the app ensures extensions/tables/indexes exist on + # startup. Set FALSE in environments where schema is owned exclusively by + # Alembic migrations to skip all boot-time DDL. + DB_BOOTSTRAP_ON_STARTUP = ( + os.getenv("DB_BOOTSTRAP_ON_STARTUP", "TRUE").upper() == "TRUE" + ) + # Per-session lock_timeout (ms) applied to boot-time DDL so a contended + # CREATE INDEX / CREATE TABLE fails fast instead of hanging the FastAPI + # lifespan forever behind another transaction's lock. + DB_DDL_LOCK_TIMEOUT_MS = int(os.getenv("DB_DDL_LOCK_TIMEOUT_MS", "5000")) + # Global idle_in_transaction_session_timeout (ms) applied to every pooled + # connection so an abandoned "idle in transaction" session can't wedge the + # database indefinitely. 0 disables. Only applied to asyncpg connections. + DB_IDLE_IN_TX_TIMEOUT_MS = int(os.getenv("DB_IDLE_IN_TX_TIMEOUT_MS", "900000")) + # Same protection for the separate Celery worker engine, where long-running + # ingestion/podcast/video tasks live. Kept higher than the web default so a + # legitimate per-document embed window is never reaped: if a task hasn't + # touched the DB in 60 min it's treated as orphaned and dropped. 0 disables. + DB_CELERY_IDLE_IN_TX_TIMEOUT_MS = int( + os.getenv("DB_CELERY_IDLE_IN_TX_TIMEOUT_MS", "3600000") + ) + # Celery / Redis # Redis (single endpoint for Celery broker, result backend, and app cache). # Legacy CELERY_BROKER_URL / CELERY_RESULT_BACKEND / REDIS_APP_URL still diff --git a/surfsense_backend/app/db.py b/surfsense_backend/app/db.py index 2d672131b..dd8f9d19c 100644 --- a/surfsense_backend/app/db.py +++ b/surfsense_backend/app/db.py @@ -1,3 +1,4 @@ +import logging import uuid from collections.abc import AsyncGenerator from contextlib import asynccontextmanager @@ -34,6 +35,8 @@ from app.config import config if config.AUTH_TYPE == "GOOGLE": from fastapi_users.db import SQLAlchemyBaseOAuthAccountTableUUID +logger = logging.getLogger(__name__) + DATABASE_URL = config.DATABASE_URL @@ -2871,6 +2874,28 @@ from app.podcasts.persistence import ( # noqa: E402, F401 PodcastStatus, ) + +def _build_connect_args() -> dict: + """Build driver connect_args, including a protective idle-in-transaction + timeout for asyncpg connections. + + A single abandoned ``idle in transaction`` session can hold table/row locks + indefinitely and wedge writes plus boot-time DDL (the classic "FastAPI + stuck at application startup" failure). Setting + ``idle_in_transaction_session_timeout`` server-side makes Postgres reap such + sessions automatically. It never affects sessions that are actively running + statements — only ones that opened a transaction and went idle. + """ + connect_args: dict = {} + idle_ms = config.DB_IDLE_IN_TX_TIMEOUT_MS + # ``server_settings`` is asyncpg-specific; only apply it for that driver. + if idle_ms and idle_ms > 0 and DATABASE_URL and "asyncpg" in DATABASE_URL: + connect_args["server_settings"] = { + "idle_in_transaction_session_timeout": str(idle_ms) + } + return connect_args + + engine = create_async_engine( DATABASE_URL, pool_size=30, @@ -2878,6 +2903,7 @@ engine = create_async_engine( pool_recycle=1800, pool_pre_ping=True, pool_timeout=30, + connect_args=_build_connect_args(), ) async_session_maker = async_sessionmaker(engine, expire_on_commit=False) @@ -2902,54 +2928,117 @@ async def shielded_async_session(): await session.close() -async def setup_indexes(): - async with engine.begin() as conn: - # Create indexes - # Document embedding indexes - await conn.execute( - text( - "CREATE INDEX IF NOT EXISTS document_vector_index ON documents USING hnsw (embedding public.vector_cosine_ops)" - ) - ) - await conn.execute( - text( - "CREATE INDEX IF NOT EXISTS document_search_index ON documents USING gin (to_tsvector('english', content))" - ) - ) - # Document Chuck Indexes - await conn.execute( - text( - "CREATE INDEX IF NOT EXISTS chucks_vector_index ON chunks USING hnsw (embedding public.vector_cosine_ops)" - ) - ) - await conn.execute( - text( - "CREATE INDEX IF NOT EXISTS chucks_search_index ON chunks USING gin (to_tsvector('english', content))" - ) - ) - # pg_trgm indexes for efficient ILIKE '%term%' searches on titles - # Critical for document mention picker (@mentions) to scale - await conn.execute( - text( - "CREATE INDEX IF NOT EXISTS idx_documents_title_trgm ON documents USING gin (title gin_trgm_ops)" - ) - ) - # B-tree index on search_space_id for fast filtering - await conn.execute( - text( - "CREATE INDEX IF NOT EXISTS idx_documents_search_space_id ON documents (search_space_id)" - ) - ) - # Covering index for "recent documents" query - enables index-only scan - await conn.execute( - text( - "CREATE INDEX IF NOT EXISTS idx_documents_search_space_updated ON documents (search_space_id, updated_at DESC NULLS LAST) INCLUDE (id, title, document_type)" - ) +# (index_name, table, CREATE statement). Built with CONCURRENTLY so an index +# build only takes a non-blocking ShareUpdateExclusiveLock — ingestion +# INSERT/UPDATE on documents/chunks keep flowing while the index builds, and a +# slow build can never freeze the FastAPI lifespan or block writers. +_INDEX_DEFINITIONS: list[tuple[str, str, str]] = [ + ( + "document_vector_index", + "documents", + "CREATE INDEX CONCURRENTLY IF NOT EXISTS document_vector_index ON documents USING hnsw (embedding public.vector_cosine_ops)", + ), + ( + "document_search_index", + "documents", + "CREATE INDEX CONCURRENTLY IF NOT EXISTS document_search_index ON documents USING gin (to_tsvector('english', content))", + ), + ( + "chucks_vector_index", + "chunks", + "CREATE INDEX CONCURRENTLY IF NOT EXISTS chucks_vector_index ON chunks USING hnsw (embedding public.vector_cosine_ops)", + ), + ( + "chucks_search_index", + "chunks", + "CREATE INDEX CONCURRENTLY IF NOT EXISTS chucks_search_index ON chunks USING gin (to_tsvector('english', content))", + ), + # pg_trgm index for efficient ILIKE '%term%' searches on titles — critical + # for the document mention picker (@mentions) to scale. + ( + "idx_documents_title_trgm", + "documents", + "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_documents_title_trgm ON documents USING gin (title gin_trgm_ops)", + ), + ( + "idx_documents_search_space_id", + "documents", + "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_documents_search_space_id ON documents (search_space_id)", + ), + # Covering index for "recent documents" query — enables index-only scan. + ( + "idx_documents_search_space_updated", + "documents", + "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_documents_search_space_updated ON documents (search_space_id, updated_at DESC NULLS LAST) INCLUDE (id, title, document_type)", + ), +] + + +async def _drop_invalid_index(conn, name: str) -> None: + """Drop a leftover *invalid* index so it can be rebuilt. + + A ``CREATE INDEX CONCURRENTLY`` that is interrupted (timeout, crash, + cancellation) leaves behind an ``indisvalid = false`` index. Because the + name now exists, a later ``CREATE INDEX CONCURRENTLY IF NOT EXISTS`` would + skip it and the broken index would persist forever. Detect and drop it + first. + """ + result = await conn.execute( + text("SELECT indisvalid FROM pg_index WHERE indexrelid = to_regclass(:n)"), + {"n": name}, + ) + row = result.first() + if row is not None and row[0] is False: + logger.warning( + "[startup] dropping invalid leftover index %s before rebuild", name ) + await conn.execute(text(f'DROP INDEX CONCURRENTLY IF EXISTS "{name}"')) + + +async def setup_indexes() -> None: + """Ensure search/vector indexes exist without ever blocking startup. + + Each index is created with ``CONCURRENTLY`` (so it never takes a blocking + SHARE lock on documents/chunks) under a short per-session ``lock_timeout`` + (so a contended boot fails fast instead of hanging the lifespan forever). + Failures are logged and swallowed per-index — a missing index just gets + retried on the next boot rather than crash-looping the API. + """ + lock_timeout_ms = int(config.DB_DDL_LOCK_TIMEOUT_MS) + # AUTOCOMMIT is mandatory: CREATE INDEX CONCURRENTLY cannot run inside a + # transaction block. + async with engine.connect() as base_conn: + conn = await base_conn.execution_options(isolation_level="AUTOCOMMIT") + await conn.execute(text(f"SET lock_timeout = {lock_timeout_ms}")) + for name, table, ddl in _INDEX_DEFINITIONS: + try: + await _drop_invalid_index(conn, name) + await conn.execute(text(ddl)) + except Exception as exc: + # Non-fatal by design: a missing index is retried next boot. + logger.warning( + "[startup] index %s on %s not ready (%s: %s); " + "will retry on next boot", + name, + table, + exc.__class__.__name__, + exc, + ) async def create_db_and_tables(): + if not config.DB_BOOTSTRAP_ON_STARTUP: + logger.info( + "[startup] DB bootstrap skipped (DB_BOOTSTRAP_ON_STARTUP=FALSE); " + "schema/indexes are expected to be managed by migrations" + ) + return + + lock_timeout_ms = int(config.DB_DDL_LOCK_TIMEOUT_MS) async with engine.begin() as conn: + # Fail fast instead of hanging forever if another session holds a + # conflicting lock on a table we need to touch. + await conn.execute(text(f"SET LOCAL lock_timeout = {lock_timeout_ms}")) await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) await conn.run_sync(Base.metadata.create_all) diff --git a/surfsense_backend/app/tasks/celery_tasks/__init__.py b/surfsense_backend/app/tasks/celery_tasks/__init__.py index 6ea7a2e68..a1113884f 100644 --- a/surfsense_backend/app/tasks/celery_tasks/__init__.py +++ b/surfsense_backend/app/tasks/celery_tasks/__init__.py @@ -32,10 +32,27 @@ def get_celery_session_maker() -> async_sessionmaker: """ global _celery_engine, _celery_session_maker if _celery_session_maker is None: + # Reap connections orphaned mid-transaction (e.g. a worker that hung or + # crashed mid-index) so they can't hold locks on documents/chunks and + # wedge writes — the failure mode that previously left an "idle in + # transaction" session holding locks for 11+ hours. Kept generous so a + # legitimate long per-document embed window is never killed. + connect_args: dict = {} + idle_ms = config.DB_CELERY_IDLE_IN_TX_TIMEOUT_MS + if ( + idle_ms + and idle_ms > 0 + and config.DATABASE_URL + and "asyncpg" in config.DATABASE_URL + ): + connect_args["server_settings"] = { + "idle_in_transaction_session_timeout": str(idle_ms) + } _celery_engine = create_async_engine( config.DATABASE_URL, poolclass=NullPool, echo=False, + connect_args=connect_args, ) with contextlib.suppress(Exception): from app.observability.bootstrap import instrument_sqlalchemy_engine