diff --git a/surfsense_backend/app/agents/multi_agent_chat/middleware/main_agent/checkpointed_subagent_middleware/task_tool.py b/surfsense_backend/app/agents/multi_agent_chat/middleware/main_agent/checkpointed_subagent_middleware/task_tool.py index f9b316e23..f6a9ff146 100644 --- a/surfsense_backend/app/agents/multi_agent_chat/middleware/main_agent/checkpointed_subagent_middleware/task_tool.py +++ b/surfsense_backend/app/agents/multi_agent_chat/middleware/main_agent/checkpointed_subagent_middleware/task_tool.py @@ -9,6 +9,7 @@ re-raises any new pending interrupt back to the parent. from __future__ import annotations import logging +import time from typing import Annotated, Any, NoReturn from deepagents.middleware.subagents import TASK_TOOL_DESCRIPTION @@ -19,6 +20,8 @@ from langchain_core.tools import StructuredTool from langgraph.errors import GraphInterrupt from langgraph.types import Command, Interrupt +from app.utils.perf import get_perf_logger + from .config import ( consume_surfsense_resume, drain_parent_null_resume, @@ -35,6 +38,7 @@ from .resume import ( ) logger = logging.getLogger(__name__) +_perf_log = get_perf_logger() def _reraise_stamped_subagent_interrupt( @@ -209,6 +213,7 @@ def build_task_tool_with_parent_config( ], runtime: ToolRuntime, ) -> str | Command: + atask_start = time.perf_counter() logger.info( "[hitl_route] atask ENTRY: subagent_type=%r tool_call_id=%s", subagent_type, @@ -230,8 +235,10 @@ def build_task_tool_with_parent_config( # Resume bridge — see ``task`` above. pending_id: str | None = None pending_value: Any = None + aget_state_elapsed = 0.0 aget_state = getattr(subagent, "aget_state", None) if callable(aget_state): + aget_state_start = time.perf_counter() try: snapshot = await aget_state(sub_config) pending_id, pending_value = get_first_pending_subagent_interrupt( @@ -248,32 +255,78 @@ def build_task_tool_with_parent_config( "Subagent aget_state failed; falling back to fresh ainvoke", exc_info=True, ) + finally: + aget_state_elapsed = time.perf_counter() - aget_state_start - if pending_value is not None: - resume_value = consume_surfsense_resume(runtime) - if resume_value is None: - raise RuntimeError( - f"Subagent {subagent_type!r} has a pending interrupt but no " - "surfsense_resume_value on config; resume bridge is broken." - ) - expected = hitlrequest_action_count(pending_value) - resume_value = fan_out_decisions_to_match(resume_value, expected) - # Prevent the parent's resume payload from leaking into subagent - # interrupts via langgraph's parent_scratchpad fallback. - drain_parent_null_resume(runtime) - try: - result = await subagent.ainvoke( - build_resume_command(resume_value, pending_id), - config=sub_config, - ) - except GraphInterrupt as gi: - _reraise_stamped_subagent_interrupt(gi, runtime.tool_call_id) - else: - try: - result = await subagent.ainvoke(subagent_state, config=sub_config) - except GraphInterrupt as gi: - _reraise_stamped_subagent_interrupt(gi, runtime.tool_call_id) - return _return_command_with_state_update(result, runtime.tool_call_id) + invoke_path = "resume" if pending_value is not None else "fresh" + ainvoke_start = time.perf_counter() + ainvoke_outcome = "ok" + try: + if pending_value is not None: + resume_value = consume_surfsense_resume(runtime) + if resume_value is None: + raise RuntimeError( + f"Subagent {subagent_type!r} has a pending interrupt but no " + "surfsense_resume_value on config; resume bridge is broken." + ) + expected = hitlrequest_action_count(pending_value) + resume_value = fan_out_decisions_to_match(resume_value, expected) + # Prevent the parent's resume payload from leaking into subagent + # interrupts via langgraph's parent_scratchpad fallback. + drain_parent_null_resume(runtime) + try: + result = await subagent.ainvoke( + build_resume_command(resume_value, pending_id), + config=sub_config, + ) + except GraphInterrupt as gi: + ainvoke_outcome = "interrupted" + _perf_log.info( + "[hitl_route] atask EXIT subagent_type=%r path=%s outcome=%s " + "aget_state=%.3fs ainvoke=%.3fs total=%.3fs", + subagent_type, + invoke_path, + ainvoke_outcome, + aget_state_elapsed, + time.perf_counter() - ainvoke_start, + time.perf_counter() - atask_start, + ) + _reraise_stamped_subagent_interrupt(gi, runtime.tool_call_id) + else: + try: + result = await subagent.ainvoke(subagent_state, config=sub_config) + except GraphInterrupt as gi: + ainvoke_outcome = "interrupted" + _perf_log.info( + "[hitl_route] atask EXIT subagent_type=%r path=%s outcome=%s " + "aget_state=%.3fs ainvoke=%.3fs total=%.3fs", + subagent_type, + invoke_path, + ainvoke_outcome, + aget_state_elapsed, + time.perf_counter() - ainvoke_start, + time.perf_counter() - atask_start, + ) + _reraise_stamped_subagent_interrupt(gi, runtime.tool_call_id) + ainvoke_elapsed = time.perf_counter() - ainvoke_start + except GraphInterrupt: + raise + + merge_start = time.perf_counter() + cmd = _return_command_with_state_update(result, runtime.tool_call_id) + merge_elapsed = time.perf_counter() - merge_start + _perf_log.info( + "[hitl_route] atask EXIT subagent_type=%r path=%s outcome=%s " + "aget_state=%.3fs ainvoke=%.3fs merge=%.3fs total=%.3fs", + subagent_type, + invoke_path, + ainvoke_outcome, + aget_state_elapsed, + ainvoke_elapsed, + merge_elapsed, + time.perf_counter() - atask_start, + ) + return cmd return StructuredTool.from_function( name="task",