fix(stream): route every agent (re)build through one helper to prevent factory drift

This commit is contained in:
CREDO23 2026-05-05 23:35:23 +02:00
parent 657c31fdf4
commit 3cb2c3056e
5 changed files with 60 additions and 19 deletions

View file

@ -28,9 +28,7 @@ from langchain_core.messages import HumanMessage
from sqlalchemy.future import select
from sqlalchemy.orm import selectinload
from app.agents.multi_agent_chat import (
create_surfsense_deep_agent as create_multi_agent_chat,
)
from app.agents.multi_agent_chat import create_multi_agent_chat_deep_agent
from app.agents.new_chat.chat_deepagent import create_surfsense_deep_agent
from app.agents.new_chat.checkpointer import get_checkpointer
from app.agents.new_chat.context import SurfSenseContextSchema
@ -577,6 +575,43 @@ async def _preflight_llm(llm: Any) -> None:
)
async def _build_main_agent_for_thread(
agent_factory: Any,
*,
llm: Any,
search_space_id: int,
db_session: Any,
connector_service: ConnectorService,
checkpointer: Any,
user_id: str | None,
thread_id: int | None,
agent_config: AgentConfig | None,
firecrawl_api_key: str | None,
thread_visibility: ChatVisibility | None,
filesystem_selection: FilesystemSelection | None,
disabled_tools: list[str] | None = None,
mentioned_document_ids: list[int] | None = None,
) -> Any:
"""Single (re)build path so the agent factory cannot drift across
initial build, preflight repin, and mid-stream 429 recovery for one
``thread_id``: a graph swap mid-turn would corrupt checkpointer state."""
return await agent_factory(
llm=llm,
search_space_id=search_space_id,
db_session=db_session,
connector_service=connector_service,
checkpointer=checkpointer,
user_id=user_id,
thread_id=thread_id,
agent_config=agent_config,
firecrawl_api_key=firecrawl_api_key,
thread_visibility=thread_visibility,
filesystem_selection=filesystem_selection,
disabled_tools=disabled_tools,
mentioned_document_ids=mentioned_document_ids,
)
async def _settle_speculative_agent_build(task: asyncio.Task[Any]) -> None:
"""Wait for a discarded speculative agent build to release shared state.
@ -2767,7 +2802,7 @@ async def stream_new_chat(
_t0 = time.perf_counter()
agent_factory = (
create_multi_agent_chat
create_multi_agent_chat_deep_agent
if use_multi_agent
else create_surfsense_deep_agent
)
@ -2776,7 +2811,8 @@ async def stream_new_chat(
# if preflight reports 429 we will discard this future and rebuild
# against the freshly pinned config below.
agent_build_task = asyncio.create_task(
agent_factory(
_build_main_agent_for_thread(
agent_factory,
llm=llm,
search_space_id=search_space_id,
db_session=session,
@ -2787,9 +2823,9 @@ async def stream_new_chat(
agent_config=agent_config,
firecrawl_api_key=firecrawl_api_key,
thread_visibility=visibility,
filesystem_selection=filesystem_selection,
disabled_tools=disabled_tools,
mentioned_document_ids=mentioned_document_ids,
filesystem_selection=filesystem_selection,
),
name="agent_build:stream_new_chat",
)
@ -3466,7 +3502,8 @@ async def stream_new_chat(
title_task = None
_t0 = time.perf_counter()
agent = await create_surfsense_deep_agent(
agent = await _build_main_agent_for_thread(
agent_factory,
llm=llm,
search_space_id=search_space_id,
db_session=session,
@ -3477,9 +3514,9 @@ async def stream_new_chat(
agent_config=agent_config,
firecrawl_api_key=firecrawl_api_key,
thread_visibility=visibility,
filesystem_selection=filesystem_selection,
disabled_tools=disabled_tools,
mentioned_document_ids=mentioned_document_ids,
filesystem_selection=filesystem_selection,
)
_perf_log.info(
"[stream_new_chat] Runtime rate-limit recovery repinned "
@ -4130,12 +4167,13 @@ async def stream_resume_chat(
_t0 = time.perf_counter()
agent_factory = (
create_multi_agent_chat
create_multi_agent_chat_deep_agent
if _app_config.MULTI_AGENT_CHAT_ENABLED
else create_surfsense_deep_agent
)
agent_build_task = asyncio.create_task(
agent_factory(
_build_main_agent_for_thread(
agent_factory,
llm=llm,
search_space_id=search_space_id,
db_session=session,
@ -4224,7 +4262,8 @@ async def stream_resume_chat(
"fallback_config_id": llm_config_id,
},
)
agent = await agent_factory(
agent = await _build_main_agent_for_thread(
agent_factory,
llm=llm,
search_space_id=search_space_id,
db_session=session,
@ -4409,7 +4448,8 @@ async def stream_resume_chat(
raise stream_exc
_t0 = time.perf_counter()
agent = await create_surfsense_deep_agent(
agent = await _build_main_agent_for_thread(
agent_factory,
llm=llm,
search_space_id=search_space_id,
db_session=session,
@ -4421,6 +4461,7 @@ async def stream_resume_chat(
firecrawl_api_key=firecrawl_api_key,
thread_visibility=visibility,
filesystem_selection=filesystem_selection,
disabled_tools=disabled_tools,
)
_perf_log.info(
"[stream_resume] Runtime rate-limit recovery repinned "