fix(automations/agent_task): use in-memory checkpointer to avoid Celery PoolTimeout

The shared AsyncPostgresSaver caches DB connections in a module-level
pool. Cached connections are bound to the asyncio loop that opened
them, but `run_async_celery_task` discards the loop on each task's
exit — so after the first task the pool holds connections pointing
to a dead loop, and the next automation hangs 30s before failing
with `PoolTimeout: couldn't get a connection after 30.00 sec`.

Swap agent_task to `InMemorySaver`; automation runs only need state
within one Celery task, so nothing is lost. Site-local TODO tracks
the proper future fix (dispose the checkpointer pool around each
Celery task, mirroring `_dispose_shared_db_engine`).
This commit is contained in:
CREDO23 2026-05-28 21:10:24 +02:00
parent 353755fd73
commit 958bf9f95a

View file

@ -5,17 +5,17 @@ from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any from typing import Any
from langgraph.checkpoint.memory import InMemorySaver
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from app.tasks.chat.streaming.flows.shared.llm_bundle import load_llm_bundle from app.tasks.chat.streaming.flows.shared.llm_bundle import load_llm_bundle
from app.tasks.chat.streaming.flows.shared.pre_stream_setup import ( from app.tasks.chat.streaming.flows.shared.pre_stream_setup import (
get_chat_checkpointer,
setup_connector_and_firecrawl, setup_connector_and_firecrawl,
) )
class DependencyError(Exception): class DependencyError(Exception):
"""An external dependency (LLM config, checkpointer, ...) refused to load.""" """An external dependency (LLM config, connector service, ...) refused to load."""
@dataclass(frozen=True, slots=True) @dataclass(frozen=True, slots=True)
@ -34,7 +34,7 @@ async def build_dependencies(
session: AsyncSession, session: AsyncSession,
search_space_id: int, search_space_id: int,
) -> AgentDependencies: ) -> AgentDependencies:
"""Load the LLM bundle, connector service, and checkpointer for one invoke. """Load the LLM bundle, connector service, and a per-invoke in-memory checkpointer.
Uses the search space's default LLM config (``config_id=-1``). Per-step Uses the search space's default LLM config (``config_id=-1``). Per-step
model overrides land in a future iteration alongside the ``model`` param. model overrides land in a future iteration alongside the ``model`` param.
@ -48,7 +48,24 @@ async def build_dependencies(
connector_service, firecrawl_api_key = await setup_connector_and_firecrawl( connector_service, firecrawl_api_key = await setup_connector_and_firecrawl(
session, search_space_id=search_space_id session, search_space_id=search_space_id
) )
checkpointer = await get_chat_checkpointer() # Quick fix: use an in-memory checkpointer for automation runs.
#
# The shared Postgres checkpointer caches DB connections in a
# module-level pool. Each cached connection is bound to the asyncio
# loop that opened it. Celery throws away the loop after every task,
# so the pool ends up full of connections pointing to a dead loop,
# and the next Celery task (running on a fresh loop) can't use any
# of them — it hangs 30s and fails with
# `PoolTimeout: couldn't get a connection after 30.00 sec`.
#
# InMemorySaver has no cached connections, no loop binding — each
# Celery task creates one and drops it on exit.
#
# TODO(checkpointer): proper fix is to dispose the checkpointer
# pool around each Celery task in `run_async_celery_task`, the same
# way `_dispose_shared_db_engine` already does for the SQLAlchemy
# pool. Then this site can switch back to the shared checkpointer.
checkpointer = InMemorySaver()
return AgentDependencies( return AgentDependencies(
llm=llm, llm=llm,
agent_config=agent_config, agent_config=agent_config,