Merge pull request #1502 from MODSetter/fix/db-startup-index-lock-hang

hotpatch: Fix/db startup index lock hang
This commit is contained in:
Rohan Verma 2026-06-16 16:28:38 -07:00 committed by GitHub
commit 7ce409c580
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 186 additions and 43 deletions

View file

@ -1,5 +1,20 @@
DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/surfsense
# --- Database startup / safety knobs (optional) ---
# Run extension/table/index DDL on app startup. Set FALSE when schema is owned
# exclusively by Alembic migrations.
# DB_BOOTSTRAP_ON_STARTUP=TRUE
# lock_timeout (ms) for boot-time DDL so a contended CREATE INDEX/TABLE fails
# fast instead of hanging the FastAPI lifespan behind another transaction.
# DB_DDL_LOCK_TIMEOUT_MS=5000
# idle_in_transaction_session_timeout (ms) so an abandoned "idle in transaction"
# session can't wedge the DB indefinitely. 0 disables. (asyncpg only)
# DB_IDLE_IN_TX_TIMEOUT_MS=900000
# Same, for the Celery worker engine (long ingestion/podcast/video tasks). If a
# task hasn't touched the DB in this window it's treated as orphaned and dropped.
# 0 disables. (asyncpg only)
# DB_CELERY_IDLE_IN_TX_TIMEOUT_MS=3600000
# Deployment environment: dev or production
SURFSENSE_ENV=dev

View file

@ -541,6 +541,28 @@ class Config:
# Database
DATABASE_URL = os.getenv("DATABASE_URL")
# When TRUE (default) the app ensures extensions/tables/indexes exist on
# startup. Set FALSE in environments where schema is owned exclusively by
# Alembic migrations to skip all boot-time DDL.
DB_BOOTSTRAP_ON_STARTUP = (
os.getenv("DB_BOOTSTRAP_ON_STARTUP", "TRUE").upper() == "TRUE"
)
# Per-session lock_timeout (ms) applied to boot-time DDL so a contended
# CREATE INDEX / CREATE TABLE fails fast instead of hanging the FastAPI
# lifespan forever behind another transaction's lock.
DB_DDL_LOCK_TIMEOUT_MS = int(os.getenv("DB_DDL_LOCK_TIMEOUT_MS", "5000"))
# Global idle_in_transaction_session_timeout (ms) applied to every pooled
# connection so an abandoned "idle in transaction" session can't wedge the
# database indefinitely. 0 disables. Only applied to asyncpg connections.
DB_IDLE_IN_TX_TIMEOUT_MS = int(os.getenv("DB_IDLE_IN_TX_TIMEOUT_MS", "900000"))
# Same protection for the separate Celery worker engine, where long-running
# ingestion/podcast/video tasks live. Kept higher than the web default so a
# legitimate per-document embed window is never reaped: if a task hasn't
# touched the DB in 60 min it's treated as orphaned and dropped. 0 disables.
DB_CELERY_IDLE_IN_TX_TIMEOUT_MS = int(
os.getenv("DB_CELERY_IDLE_IN_TX_TIMEOUT_MS", "3600000")
)
# Celery / Redis
# Redis (single endpoint for Celery broker, result backend, and app cache).
# Legacy CELERY_BROKER_URL / CELERY_RESULT_BACKEND / REDIS_APP_URL still

View file

@ -1,3 +1,4 @@
import logging
import uuid
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
@ -34,6 +35,8 @@ from app.config import config
if config.AUTH_TYPE == "GOOGLE":
from fastapi_users.db import SQLAlchemyBaseOAuthAccountTableUUID
logger = logging.getLogger(__name__)
DATABASE_URL = config.DATABASE_URL
@ -2871,6 +2874,28 @@ from app.podcasts.persistence import ( # noqa: E402, F401
PodcastStatus,
)
def _build_connect_args() -> dict:
"""Build driver connect_args, including a protective idle-in-transaction
timeout for asyncpg connections.
A single abandoned ``idle in transaction`` session can hold table/row locks
indefinitely and wedge writes plus boot-time DDL (the classic "FastAPI
stuck at application startup" failure). Setting
``idle_in_transaction_session_timeout`` server-side makes Postgres reap such
sessions automatically. It never affects sessions that are actively running
statements only ones that opened a transaction and went idle.
"""
connect_args: dict = {}
idle_ms = config.DB_IDLE_IN_TX_TIMEOUT_MS
# ``server_settings`` is asyncpg-specific; only apply it for that driver.
if idle_ms and idle_ms > 0 and DATABASE_URL and "asyncpg" in DATABASE_URL:
connect_args["server_settings"] = {
"idle_in_transaction_session_timeout": str(idle_ms)
}
return connect_args
engine = create_async_engine(
DATABASE_URL,
pool_size=30,
@ -2878,6 +2903,7 @@ engine = create_async_engine(
pool_recycle=1800,
pool_pre_ping=True,
pool_timeout=30,
connect_args=_build_connect_args(),
)
async_session_maker = async_sessionmaker(engine, expire_on_commit=False)
@ -2902,54 +2928,117 @@ async def shielded_async_session():
await session.close()
async def setup_indexes():
async with engine.begin() as conn:
# Create indexes
# Document embedding indexes
await conn.execute(
text(
"CREATE INDEX IF NOT EXISTS document_vector_index ON documents USING hnsw (embedding public.vector_cosine_ops)"
)
)
await conn.execute(
text(
"CREATE INDEX IF NOT EXISTS document_search_index ON documents USING gin (to_tsvector('english', content))"
)
)
# Document Chuck Indexes
await conn.execute(
text(
"CREATE INDEX IF NOT EXISTS chucks_vector_index ON chunks USING hnsw (embedding public.vector_cosine_ops)"
)
)
await conn.execute(
text(
"CREATE INDEX IF NOT EXISTS chucks_search_index ON chunks USING gin (to_tsvector('english', content))"
)
)
# pg_trgm indexes for efficient ILIKE '%term%' searches on titles
# Critical for document mention picker (@mentions) to scale
await conn.execute(
text(
"CREATE INDEX IF NOT EXISTS idx_documents_title_trgm ON documents USING gin (title gin_trgm_ops)"
)
)
# B-tree index on search_space_id for fast filtering
await conn.execute(
text(
"CREATE INDEX IF NOT EXISTS idx_documents_search_space_id ON documents (search_space_id)"
)
)
# Covering index for "recent documents" query - enables index-only scan
await conn.execute(
text(
"CREATE INDEX IF NOT EXISTS idx_documents_search_space_updated ON documents (search_space_id, updated_at DESC NULLS LAST) INCLUDE (id, title, document_type)"
)
# (index_name, table, CREATE statement). Built with CONCURRENTLY so an index
# build only takes a non-blocking ShareUpdateExclusiveLock — ingestion
# INSERT/UPDATE on documents/chunks keep flowing while the index builds, and a
# slow build can never freeze the FastAPI lifespan or block writers.
_INDEX_DEFINITIONS: list[tuple[str, str, str]] = [
(
"document_vector_index",
"documents",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS document_vector_index ON documents USING hnsw (embedding public.vector_cosine_ops)",
),
(
"document_search_index",
"documents",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS document_search_index ON documents USING gin (to_tsvector('english', content))",
),
(
"chucks_vector_index",
"chunks",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS chucks_vector_index ON chunks USING hnsw (embedding public.vector_cosine_ops)",
),
(
"chucks_search_index",
"chunks",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS chucks_search_index ON chunks USING gin (to_tsvector('english', content))",
),
# pg_trgm index for efficient ILIKE '%term%' searches on titles — critical
# for the document mention picker (@mentions) to scale.
(
"idx_documents_title_trgm",
"documents",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_documents_title_trgm ON documents USING gin (title gin_trgm_ops)",
),
(
"idx_documents_search_space_id",
"documents",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_documents_search_space_id ON documents (search_space_id)",
),
# Covering index for "recent documents" query — enables index-only scan.
(
"idx_documents_search_space_updated",
"documents",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_documents_search_space_updated ON documents (search_space_id, updated_at DESC NULLS LAST) INCLUDE (id, title, document_type)",
),
]
async def _drop_invalid_index(conn, name: str) -> None:
"""Drop a leftover *invalid* index so it can be rebuilt.
A ``CREATE INDEX CONCURRENTLY`` that is interrupted (timeout, crash,
cancellation) leaves behind an ``indisvalid = false`` index. Because the
name now exists, a later ``CREATE INDEX CONCURRENTLY IF NOT EXISTS`` would
skip it and the broken index would persist forever. Detect and drop it
first.
"""
result = await conn.execute(
text("SELECT indisvalid FROM pg_index WHERE indexrelid = to_regclass(:n)"),
{"n": name},
)
row = result.first()
if row is not None and row[0] is False:
logger.warning(
"[startup] dropping invalid leftover index %s before rebuild", name
)
await conn.execute(text(f'DROP INDEX CONCURRENTLY IF EXISTS "{name}"'))
async def setup_indexes() -> None:
"""Ensure search/vector indexes exist without ever blocking startup.
Each index is created with ``CONCURRENTLY`` (so it never takes a blocking
SHARE lock on documents/chunks) under a short per-session ``lock_timeout``
(so a contended boot fails fast instead of hanging the lifespan forever).
Failures are logged and swallowed per-index a missing index just gets
retried on the next boot rather than crash-looping the API.
"""
lock_timeout_ms = int(config.DB_DDL_LOCK_TIMEOUT_MS)
# AUTOCOMMIT is mandatory: CREATE INDEX CONCURRENTLY cannot run inside a
# transaction block.
async with engine.connect() as base_conn:
conn = await base_conn.execution_options(isolation_level="AUTOCOMMIT")
await conn.execute(text(f"SET lock_timeout = {lock_timeout_ms}"))
for name, table, ddl in _INDEX_DEFINITIONS:
try:
await _drop_invalid_index(conn, name)
await conn.execute(text(ddl))
except Exception as exc:
# Non-fatal by design: a missing index is retried next boot.
logger.warning(
"[startup] index %s on %s not ready (%s: %s); "
"will retry on next boot",
name,
table,
exc.__class__.__name__,
exc,
)
async def create_db_and_tables():
if not config.DB_BOOTSTRAP_ON_STARTUP:
logger.info(
"[startup] DB bootstrap skipped (DB_BOOTSTRAP_ON_STARTUP=FALSE); "
"schema/indexes are expected to be managed by migrations"
)
return
lock_timeout_ms = int(config.DB_DDL_LOCK_TIMEOUT_MS)
async with engine.begin() as conn:
# Fail fast instead of hanging forever if another session holds a
# conflicting lock on a table we need to touch.
await conn.execute(text(f"SET LOCAL lock_timeout = {lock_timeout_ms}"))
await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector"))
await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm"))
await conn.run_sync(Base.metadata.create_all)

View file

@ -32,10 +32,27 @@ def get_celery_session_maker() -> async_sessionmaker:
"""
global _celery_engine, _celery_session_maker
if _celery_session_maker is None:
# Reap connections orphaned mid-transaction (e.g. a worker that hung or
# crashed mid-index) so they can't hold locks on documents/chunks and
# wedge writes — the failure mode that previously left an "idle in
# transaction" session holding locks for 11+ hours. Kept generous so a
# legitimate long per-document embed window is never killed.
connect_args: dict = {}
idle_ms = config.DB_CELERY_IDLE_IN_TX_TIMEOUT_MS
if (
idle_ms
and idle_ms > 0
and config.DATABASE_URL
and "asyncpg" in config.DATABASE_URL
):
connect_args["server_settings"] = {
"idle_in_transaction_session_timeout": str(idle_ms)
}
_celery_engine = create_async_engine(
config.DATABASE_URL,
poolclass=NullPool,
echo=False,
connect_args=connect_args,
)
with contextlib.suppress(Exception):
from app.observability.bootstrap import instrument_sqlalchemy_engine