From 958bf9f95ad0bf32450b0aa86bc99218c683aad3 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Thu, 28 May 2026 21:10:24 +0200 Subject: [PATCH] fix(automations/agent_task): use in-memory checkpointer to avoid Celery PoolTimeout MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The shared AsyncPostgresSaver caches DB connections in a module-level pool. Cached connections are bound to the asyncio loop that opened them, but `run_async_celery_task` discards the loop on each task's exit — so after the first task the pool holds connections pointing to a dead loop, and the next automation hangs 30s before failing with `PoolTimeout: couldn't get a connection after 30.00 sec`. Swap agent_task to `InMemorySaver`; automation runs only need state within one Celery task, so nothing is lost. Site-local TODO tracks the proper future fix (dispose the checkpointer pool around each Celery task, mirroring `_dispose_shared_db_engine`). --- .../actions/agent_task/dependencies.py | 25 ++++++++++++++++--- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/surfsense_backend/app/automations/actions/agent_task/dependencies.py b/surfsense_backend/app/automations/actions/agent_task/dependencies.py index 12273aa0f..79107cd65 100644 --- a/surfsense_backend/app/automations/actions/agent_task/dependencies.py +++ b/surfsense_backend/app/automations/actions/agent_task/dependencies.py @@ -5,17 +5,17 @@ from __future__ import annotations from dataclasses import dataclass from typing import Any +from langgraph.checkpoint.memory import InMemorySaver from sqlalchemy.ext.asyncio import AsyncSession from app.tasks.chat.streaming.flows.shared.llm_bundle import load_llm_bundle from app.tasks.chat.streaming.flows.shared.pre_stream_setup import ( - get_chat_checkpointer, setup_connector_and_firecrawl, ) class DependencyError(Exception): - """An external dependency (LLM config, checkpointer, ...) refused to load.""" + """An external dependency (LLM config, connector service, ...) refused to load.""" @dataclass(frozen=True, slots=True) @@ -34,7 +34,7 @@ async def build_dependencies( session: AsyncSession, search_space_id: int, ) -> AgentDependencies: - """Load the LLM bundle, connector service, and checkpointer for one invoke. + """Load the LLM bundle, connector service, and a per-invoke in-memory checkpointer. Uses the search space's default LLM config (``config_id=-1``). Per-step model overrides land in a future iteration alongside the ``model`` param. @@ -48,7 +48,24 @@ async def build_dependencies( connector_service, firecrawl_api_key = await setup_connector_and_firecrawl( session, search_space_id=search_space_id ) - checkpointer = await get_chat_checkpointer() + # Quick fix: use an in-memory checkpointer for automation runs. + # + # The shared Postgres checkpointer caches DB connections in a + # module-level pool. Each cached connection is bound to the asyncio + # loop that opened it. Celery throws away the loop after every task, + # so the pool ends up full of connections pointing to a dead loop, + # and the next Celery task (running on a fresh loop) can't use any + # of them — it hangs 30s and fails with + # `PoolTimeout: couldn't get a connection after 30.00 sec`. + # + # InMemorySaver has no cached connections, no loop binding — each + # Celery task creates one and drops it on exit. + # + # TODO(checkpointer): proper fix is to dispose the checkpointer + # pool around each Celery task in `run_async_celery_task`, the same + # way `_dispose_shared_db_engine` already does for the SQLAlchemy + # pool. Then this site can switch back to the shared checkpointer. + checkpointer = InMemorySaver() return AgentDependencies( llm=llm, agent_config=agent_config,