mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-05-27 19:25:15 +02:00
perf(subagent): add atask EXIT breakdown timing log
This commit is contained in:
parent
9e81f2a35b
commit
33bfce4406
1 changed files with 78 additions and 25 deletions
|
|
@ -9,6 +9,7 @@ re-raises any new pending interrupt back to the parent.
|
|||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import time
|
||||
from typing import Annotated, Any, NoReturn
|
||||
|
||||
from deepagents.middleware.subagents import TASK_TOOL_DESCRIPTION
|
||||
|
|
@ -19,6 +20,8 @@ from langchain_core.tools import StructuredTool
|
|||
from langgraph.errors import GraphInterrupt
|
||||
from langgraph.types import Command, Interrupt
|
||||
|
||||
from app.utils.perf import get_perf_logger
|
||||
|
||||
from .config import (
|
||||
consume_surfsense_resume,
|
||||
drain_parent_null_resume,
|
||||
|
|
@ -35,6 +38,7 @@ from .resume import (
|
|||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
_perf_log = get_perf_logger()
|
||||
|
||||
|
||||
def _reraise_stamped_subagent_interrupt(
|
||||
|
|
@ -209,6 +213,7 @@ def build_task_tool_with_parent_config(
|
|||
],
|
||||
runtime: ToolRuntime,
|
||||
) -> str | Command:
|
||||
atask_start = time.perf_counter()
|
||||
logger.info(
|
||||
"[hitl_route] atask ENTRY: subagent_type=%r tool_call_id=%s",
|
||||
subagent_type,
|
||||
|
|
@ -230,8 +235,10 @@ def build_task_tool_with_parent_config(
|
|||
# Resume bridge — see ``task`` above.
|
||||
pending_id: str | None = None
|
||||
pending_value: Any = None
|
||||
aget_state_elapsed = 0.0
|
||||
aget_state = getattr(subagent, "aget_state", None)
|
||||
if callable(aget_state):
|
||||
aget_state_start = time.perf_counter()
|
||||
try:
|
||||
snapshot = await aget_state(sub_config)
|
||||
pending_id, pending_value = get_first_pending_subagent_interrupt(
|
||||
|
|
@ -248,32 +255,78 @@ def build_task_tool_with_parent_config(
|
|||
"Subagent aget_state failed; falling back to fresh ainvoke",
|
||||
exc_info=True,
|
||||
)
|
||||
finally:
|
||||
aget_state_elapsed = time.perf_counter() - aget_state_start
|
||||
|
||||
if pending_value is not None:
|
||||
resume_value = consume_surfsense_resume(runtime)
|
||||
if resume_value is None:
|
||||
raise RuntimeError(
|
||||
f"Subagent {subagent_type!r} has a pending interrupt but no "
|
||||
"surfsense_resume_value on config; resume bridge is broken."
|
||||
)
|
||||
expected = hitlrequest_action_count(pending_value)
|
||||
resume_value = fan_out_decisions_to_match(resume_value, expected)
|
||||
# Prevent the parent's resume payload from leaking into subagent
|
||||
# interrupts via langgraph's parent_scratchpad fallback.
|
||||
drain_parent_null_resume(runtime)
|
||||
try:
|
||||
result = await subagent.ainvoke(
|
||||
build_resume_command(resume_value, pending_id),
|
||||
config=sub_config,
|
||||
)
|
||||
except GraphInterrupt as gi:
|
||||
_reraise_stamped_subagent_interrupt(gi, runtime.tool_call_id)
|
||||
else:
|
||||
try:
|
||||
result = await subagent.ainvoke(subagent_state, config=sub_config)
|
||||
except GraphInterrupt as gi:
|
||||
_reraise_stamped_subagent_interrupt(gi, runtime.tool_call_id)
|
||||
return _return_command_with_state_update(result, runtime.tool_call_id)
|
||||
invoke_path = "resume" if pending_value is not None else "fresh"
|
||||
ainvoke_start = time.perf_counter()
|
||||
ainvoke_outcome = "ok"
|
||||
try:
|
||||
if pending_value is not None:
|
||||
resume_value = consume_surfsense_resume(runtime)
|
||||
if resume_value is None:
|
||||
raise RuntimeError(
|
||||
f"Subagent {subagent_type!r} has a pending interrupt but no "
|
||||
"surfsense_resume_value on config; resume bridge is broken."
|
||||
)
|
||||
expected = hitlrequest_action_count(pending_value)
|
||||
resume_value = fan_out_decisions_to_match(resume_value, expected)
|
||||
# Prevent the parent's resume payload from leaking into subagent
|
||||
# interrupts via langgraph's parent_scratchpad fallback.
|
||||
drain_parent_null_resume(runtime)
|
||||
try:
|
||||
result = await subagent.ainvoke(
|
||||
build_resume_command(resume_value, pending_id),
|
||||
config=sub_config,
|
||||
)
|
||||
except GraphInterrupt as gi:
|
||||
ainvoke_outcome = "interrupted"
|
||||
_perf_log.info(
|
||||
"[hitl_route] atask EXIT subagent_type=%r path=%s outcome=%s "
|
||||
"aget_state=%.3fs ainvoke=%.3fs total=%.3fs",
|
||||
subagent_type,
|
||||
invoke_path,
|
||||
ainvoke_outcome,
|
||||
aget_state_elapsed,
|
||||
time.perf_counter() - ainvoke_start,
|
||||
time.perf_counter() - atask_start,
|
||||
)
|
||||
_reraise_stamped_subagent_interrupt(gi, runtime.tool_call_id)
|
||||
else:
|
||||
try:
|
||||
result = await subagent.ainvoke(subagent_state, config=sub_config)
|
||||
except GraphInterrupt as gi:
|
||||
ainvoke_outcome = "interrupted"
|
||||
_perf_log.info(
|
||||
"[hitl_route] atask EXIT subagent_type=%r path=%s outcome=%s "
|
||||
"aget_state=%.3fs ainvoke=%.3fs total=%.3fs",
|
||||
subagent_type,
|
||||
invoke_path,
|
||||
ainvoke_outcome,
|
||||
aget_state_elapsed,
|
||||
time.perf_counter() - ainvoke_start,
|
||||
time.perf_counter() - atask_start,
|
||||
)
|
||||
_reraise_stamped_subagent_interrupt(gi, runtime.tool_call_id)
|
||||
ainvoke_elapsed = time.perf_counter() - ainvoke_start
|
||||
except GraphInterrupt:
|
||||
raise
|
||||
|
||||
merge_start = time.perf_counter()
|
||||
cmd = _return_command_with_state_update(result, runtime.tool_call_id)
|
||||
merge_elapsed = time.perf_counter() - merge_start
|
||||
_perf_log.info(
|
||||
"[hitl_route] atask EXIT subagent_type=%r path=%s outcome=%s "
|
||||
"aget_state=%.3fs ainvoke=%.3fs merge=%.3fs total=%.3fs",
|
||||
subagent_type,
|
||||
invoke_path,
|
||||
ainvoke_outcome,
|
||||
aget_state_elapsed,
|
||||
ainvoke_elapsed,
|
||||
merge_elapsed,
|
||||
time.perf_counter() - atask_start,
|
||||
)
|
||||
return cmd
|
||||
|
||||
return StructuredTool.from_function(
|
||||
name="task",
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue