From 89cc3b37ee7a90f7c45366dbf7c60dececfa8947 Mon Sep 17 00:00:00 2001 From: "DESKTOP-RTLN3BA\\$punk" Date: Tue, 16 Jun 2026 16:18:49 -0700 Subject: [PATCH] fix(db): prevent boot-time index DDL from hanging FastAPI startup A single abandoned "idle in transaction" session held locks on the documents table, which blocked the non-concurrent CREATE INDEX (hnsw) run inside the FastAPI lifespan. Each API restart queued another CREATE INDEX behind an advisory lock, leaving the server stuck at "Waiting for application startup." indefinitely and freezing ingestion writes. Changes: - setup_indexes(): build every index with CREATE INDEX CONCURRENTLY (non-blocking ShareUpdateExclusiveLock) under a per-session lock_timeout, and make each statement non-fatal so a contended/slow build is retried next boot instead of wedging startup. Drop leftover invalid indexes before rebuilding. - create_db_and_tables(): apply lock_timeout to extension/create_all DDL and gate the whole bootstrap behind DB_BOOTSTRAP_ON_STARTUP. - engine: set idle_in_transaction_session_timeout (asyncpg) so an abandoned transaction is reaped automatically. - config + .env.example: DB_BOOTSTRAP_ON_STARTUP, DB_DDL_LOCK_TIMEOUT_MS, DB_IDLE_IN_TX_TIMEOUT_MS. Co-authored-by: Cursor --- surfsense_backend/.env.example | 11 ++ surfsense_backend/app/config/__init__.py | 15 ++ surfsense_backend/app/db.py | 175 +++++++++++++++++------ 3 files changed, 158 insertions(+), 43 deletions(-) diff --git a/surfsense_backend/.env.example b/surfsense_backend/.env.example index b4f67328c..d6c0a634e 100644 --- a/surfsense_backend/.env.example +++ b/surfsense_backend/.env.example @@ -1,5 +1,16 @@ DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/surfsense +# --- Database startup / safety knobs (optional) --- +# Run extension/table/index DDL on app startup. Set FALSE when schema is owned +# exclusively by Alembic migrations. +# DB_BOOTSTRAP_ON_STARTUP=TRUE +# lock_timeout (ms) for boot-time DDL so a contended CREATE INDEX/TABLE fails +# fast instead of hanging the FastAPI lifespan behind another transaction. +# DB_DDL_LOCK_TIMEOUT_MS=5000 +# idle_in_transaction_session_timeout (ms) so an abandoned "idle in transaction" +# session can't wedge the DB indefinitely. 0 disables. (asyncpg only) +# DB_IDLE_IN_TX_TIMEOUT_MS=900000 + # Deployment environment: dev or production SURFSENSE_ENV=dev diff --git a/surfsense_backend/app/config/__init__.py b/surfsense_backend/app/config/__init__.py index bbaf3ac55..6b090491f 100644 --- a/surfsense_backend/app/config/__init__.py +++ b/surfsense_backend/app/config/__init__.py @@ -541,6 +541,21 @@ class Config: # Database DATABASE_URL = os.getenv("DATABASE_URL") + # When TRUE (default) the app ensures extensions/tables/indexes exist on + # startup. Set FALSE in environments where schema is owned exclusively by + # Alembic migrations to skip all boot-time DDL. + DB_BOOTSTRAP_ON_STARTUP = ( + os.getenv("DB_BOOTSTRAP_ON_STARTUP", "TRUE").upper() == "TRUE" + ) + # Per-session lock_timeout (ms) applied to boot-time DDL so a contended + # CREATE INDEX / CREATE TABLE fails fast instead of hanging the FastAPI + # lifespan forever behind another transaction's lock. + DB_DDL_LOCK_TIMEOUT_MS = int(os.getenv("DB_DDL_LOCK_TIMEOUT_MS", "5000")) + # Global idle_in_transaction_session_timeout (ms) applied to every pooled + # connection so an abandoned "idle in transaction" session can't wedge the + # database indefinitely. 0 disables. Only applied to asyncpg connections. + DB_IDLE_IN_TX_TIMEOUT_MS = int(os.getenv("DB_IDLE_IN_TX_TIMEOUT_MS", "900000")) + # Celery / Redis # Redis (single endpoint for Celery broker, result backend, and app cache). # Legacy CELERY_BROKER_URL / CELERY_RESULT_BACKEND / REDIS_APP_URL still diff --git a/surfsense_backend/app/db.py b/surfsense_backend/app/db.py index 2d672131b..dd8f9d19c 100644 --- a/surfsense_backend/app/db.py +++ b/surfsense_backend/app/db.py @@ -1,3 +1,4 @@ +import logging import uuid from collections.abc import AsyncGenerator from contextlib import asynccontextmanager @@ -34,6 +35,8 @@ from app.config import config if config.AUTH_TYPE == "GOOGLE": from fastapi_users.db import SQLAlchemyBaseOAuthAccountTableUUID +logger = logging.getLogger(__name__) + DATABASE_URL = config.DATABASE_URL @@ -2871,6 +2874,28 @@ from app.podcasts.persistence import ( # noqa: E402, F401 PodcastStatus, ) + +def _build_connect_args() -> dict: + """Build driver connect_args, including a protective idle-in-transaction + timeout for asyncpg connections. + + A single abandoned ``idle in transaction`` session can hold table/row locks + indefinitely and wedge writes plus boot-time DDL (the classic "FastAPI + stuck at application startup" failure). Setting + ``idle_in_transaction_session_timeout`` server-side makes Postgres reap such + sessions automatically. It never affects sessions that are actively running + statements — only ones that opened a transaction and went idle. + """ + connect_args: dict = {} + idle_ms = config.DB_IDLE_IN_TX_TIMEOUT_MS + # ``server_settings`` is asyncpg-specific; only apply it for that driver. + if idle_ms and idle_ms > 0 and DATABASE_URL and "asyncpg" in DATABASE_URL: + connect_args["server_settings"] = { + "idle_in_transaction_session_timeout": str(idle_ms) + } + return connect_args + + engine = create_async_engine( DATABASE_URL, pool_size=30, @@ -2878,6 +2903,7 @@ engine = create_async_engine( pool_recycle=1800, pool_pre_ping=True, pool_timeout=30, + connect_args=_build_connect_args(), ) async_session_maker = async_sessionmaker(engine, expire_on_commit=False) @@ -2902,54 +2928,117 @@ async def shielded_async_session(): await session.close() -async def setup_indexes(): - async with engine.begin() as conn: - # Create indexes - # Document embedding indexes - await conn.execute( - text( - "CREATE INDEX IF NOT EXISTS document_vector_index ON documents USING hnsw (embedding public.vector_cosine_ops)" - ) - ) - await conn.execute( - text( - "CREATE INDEX IF NOT EXISTS document_search_index ON documents USING gin (to_tsvector('english', content))" - ) - ) - # Document Chuck Indexes - await conn.execute( - text( - "CREATE INDEX IF NOT EXISTS chucks_vector_index ON chunks USING hnsw (embedding public.vector_cosine_ops)" - ) - ) - await conn.execute( - text( - "CREATE INDEX IF NOT EXISTS chucks_search_index ON chunks USING gin (to_tsvector('english', content))" - ) - ) - # pg_trgm indexes for efficient ILIKE '%term%' searches on titles - # Critical for document mention picker (@mentions) to scale - await conn.execute( - text( - "CREATE INDEX IF NOT EXISTS idx_documents_title_trgm ON documents USING gin (title gin_trgm_ops)" - ) - ) - # B-tree index on search_space_id for fast filtering - await conn.execute( - text( - "CREATE INDEX IF NOT EXISTS idx_documents_search_space_id ON documents (search_space_id)" - ) - ) - # Covering index for "recent documents" query - enables index-only scan - await conn.execute( - text( - "CREATE INDEX IF NOT EXISTS idx_documents_search_space_updated ON documents (search_space_id, updated_at DESC NULLS LAST) INCLUDE (id, title, document_type)" - ) +# (index_name, table, CREATE statement). Built with CONCURRENTLY so an index +# build only takes a non-blocking ShareUpdateExclusiveLock — ingestion +# INSERT/UPDATE on documents/chunks keep flowing while the index builds, and a +# slow build can never freeze the FastAPI lifespan or block writers. +_INDEX_DEFINITIONS: list[tuple[str, str, str]] = [ + ( + "document_vector_index", + "documents", + "CREATE INDEX CONCURRENTLY IF NOT EXISTS document_vector_index ON documents USING hnsw (embedding public.vector_cosine_ops)", + ), + ( + "document_search_index", + "documents", + "CREATE INDEX CONCURRENTLY IF NOT EXISTS document_search_index ON documents USING gin (to_tsvector('english', content))", + ), + ( + "chucks_vector_index", + "chunks", + "CREATE INDEX CONCURRENTLY IF NOT EXISTS chucks_vector_index ON chunks USING hnsw (embedding public.vector_cosine_ops)", + ), + ( + "chucks_search_index", + "chunks", + "CREATE INDEX CONCURRENTLY IF NOT EXISTS chucks_search_index ON chunks USING gin (to_tsvector('english', content))", + ), + # pg_trgm index for efficient ILIKE '%term%' searches on titles — critical + # for the document mention picker (@mentions) to scale. + ( + "idx_documents_title_trgm", + "documents", + "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_documents_title_trgm ON documents USING gin (title gin_trgm_ops)", + ), + ( + "idx_documents_search_space_id", + "documents", + "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_documents_search_space_id ON documents (search_space_id)", + ), + # Covering index for "recent documents" query — enables index-only scan. + ( + "idx_documents_search_space_updated", + "documents", + "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_documents_search_space_updated ON documents (search_space_id, updated_at DESC NULLS LAST) INCLUDE (id, title, document_type)", + ), +] + + +async def _drop_invalid_index(conn, name: str) -> None: + """Drop a leftover *invalid* index so it can be rebuilt. + + A ``CREATE INDEX CONCURRENTLY`` that is interrupted (timeout, crash, + cancellation) leaves behind an ``indisvalid = false`` index. Because the + name now exists, a later ``CREATE INDEX CONCURRENTLY IF NOT EXISTS`` would + skip it and the broken index would persist forever. Detect and drop it + first. + """ + result = await conn.execute( + text("SELECT indisvalid FROM pg_index WHERE indexrelid = to_regclass(:n)"), + {"n": name}, + ) + row = result.first() + if row is not None and row[0] is False: + logger.warning( + "[startup] dropping invalid leftover index %s before rebuild", name ) + await conn.execute(text(f'DROP INDEX CONCURRENTLY IF EXISTS "{name}"')) + + +async def setup_indexes() -> None: + """Ensure search/vector indexes exist without ever blocking startup. + + Each index is created with ``CONCURRENTLY`` (so it never takes a blocking + SHARE lock on documents/chunks) under a short per-session ``lock_timeout`` + (so a contended boot fails fast instead of hanging the lifespan forever). + Failures are logged and swallowed per-index — a missing index just gets + retried on the next boot rather than crash-looping the API. + """ + lock_timeout_ms = int(config.DB_DDL_LOCK_TIMEOUT_MS) + # AUTOCOMMIT is mandatory: CREATE INDEX CONCURRENTLY cannot run inside a + # transaction block. + async with engine.connect() as base_conn: + conn = await base_conn.execution_options(isolation_level="AUTOCOMMIT") + await conn.execute(text(f"SET lock_timeout = {lock_timeout_ms}")) + for name, table, ddl in _INDEX_DEFINITIONS: + try: + await _drop_invalid_index(conn, name) + await conn.execute(text(ddl)) + except Exception as exc: + # Non-fatal by design: a missing index is retried next boot. + logger.warning( + "[startup] index %s on %s not ready (%s: %s); " + "will retry on next boot", + name, + table, + exc.__class__.__name__, + exc, + ) async def create_db_and_tables(): + if not config.DB_BOOTSTRAP_ON_STARTUP: + logger.info( + "[startup] DB bootstrap skipped (DB_BOOTSTRAP_ON_STARTUP=FALSE); " + "schema/indexes are expected to be managed by migrations" + ) + return + + lock_timeout_ms = int(config.DB_DDL_LOCK_TIMEOUT_MS) async with engine.begin() as conn: + # Fail fast instead of hanging forever if another session holds a + # conflicting lock on a table we need to touch. + await conn.execute(text(f"SET LOCAL lock_timeout = {lock_timeout_ms}")) await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) await conn.run_sync(Base.metadata.create_all)