diff --git a/surfsense_backend/app/agents/multi_agent_chat/middleware/main_agent/checkpointed_subagent_middleware/task_tool.py b/surfsense_backend/app/agents/multi_agent_chat/middleware/main_agent/checkpointed_subagent_middleware/task_tool.py index f6a9ff146..c3babab83 100644 --- a/surfsense_backend/app/agents/multi_agent_chat/middleware/main_agent/checkpointed_subagent_middleware/task_tool.py +++ b/surfsense_backend/app/agents/multi_agent_chat/middleware/main_agent/checkpointed_subagent_middleware/task_tool.py @@ -20,6 +20,7 @@ from langchain_core.tools import StructuredTool from langgraph.errors import GraphInterrupt from langgraph.types import Command, Interrupt +from app.observability import metrics as ot_metrics, otel as ot from app.utils.perf import get_perf_logger from .config import ( @@ -173,6 +174,9 @@ def build_task_tool_with_parent_config( exc_info=True, ) + invoke_path = "resume" if pending_value is not None else "fresh" + invoke_start = time.perf_counter() + invoke_outcome = "ok" if pending_value is not None: resume_value = consume_surfsense_resume(runtime) if resume_value is None: @@ -188,18 +192,94 @@ def build_task_tool_with_parent_config( # Prevent the parent's resume payload from leaking into subagent # interrupts via langgraph's parent_scratchpad fallback. drain_parent_null_resume(runtime) - try: - result = subagent.invoke( - build_resume_command(resume_value, pending_id), - config=sub_config, - ) - except GraphInterrupt as gi: - _reraise_stamped_subagent_interrupt(gi, runtime.tool_call_id) + with ot.subagent_invoke_span( + subagent_type=subagent_type, path=invoke_path + ) as sp: + try: + result = subagent.invoke( + build_resume_command(resume_value, pending_id), + config=sub_config, + ) + sp.set_attribute("subagent.outcome", invoke_outcome) + except GraphInterrupt as gi: + invoke_outcome = "interrupted" + sp.set_attribute("subagent.outcome", invoke_outcome) + ot_metrics.record_subagent_invoke_duration( + (time.perf_counter() - invoke_start) * 1000, + subagent_type=subagent_type, + path=invoke_path, + outcome=invoke_outcome, + ) + ot_metrics.record_subagent_invoke_outcome( + subagent_type=subagent_type, + path=invoke_path, + outcome=invoke_outcome, + ) + _reraise_stamped_subagent_interrupt(gi, runtime.tool_call_id) + except Exception: + invoke_outcome = "error" + sp.set_attribute("subagent.outcome", invoke_outcome) + ot_metrics.record_subagent_invoke_duration( + (time.perf_counter() - invoke_start) * 1000, + subagent_type=subagent_type, + path=invoke_path, + outcome=invoke_outcome, + ) + ot_metrics.record_subagent_invoke_outcome( + subagent_type=subagent_type, + path=invoke_path, + outcome=invoke_outcome, + ) + raise else: - try: - result = subagent.invoke(subagent_state, config=sub_config) - except GraphInterrupt as gi: - _reraise_stamped_subagent_interrupt(gi, runtime.tool_call_id) + with ot.subagent_invoke_span( + subagent_type=subagent_type, path=invoke_path + ) as sp: + try: + result = subagent.invoke(subagent_state, config=sub_config) + sp.set_attribute("subagent.outcome", invoke_outcome) + except GraphInterrupt as gi: + invoke_outcome = "interrupted" + sp.set_attribute("subagent.outcome", invoke_outcome) + ot_metrics.record_subagent_invoke_duration( + (time.perf_counter() - invoke_start) * 1000, + subagent_type=subagent_type, + path=invoke_path, + outcome=invoke_outcome, + ) + ot_metrics.record_subagent_invoke_outcome( + subagent_type=subagent_type, + path=invoke_path, + outcome=invoke_outcome, + ) + _reraise_stamped_subagent_interrupt(gi, runtime.tool_call_id) + except Exception: + invoke_outcome = "error" + sp.set_attribute("subagent.outcome", invoke_outcome) + ot_metrics.record_subagent_invoke_duration( + (time.perf_counter() - invoke_start) * 1000, + subagent_type=subagent_type, + path=invoke_path, + outcome=invoke_outcome, + ) + ot_metrics.record_subagent_invoke_outcome( + subagent_type=subagent_type, + path=invoke_path, + outcome=invoke_outcome, + ) + raise + invoke_elapsed_ms = (time.perf_counter() - invoke_start) * 1000 + ot_metrics.record_subagent_invoke_duration( + invoke_elapsed_ms, + subagent_type=subagent_type, + path=invoke_path, + outcome=invoke_outcome, + ) + ot_metrics.record_subagent_invoke_outcome( + subagent_type=subagent_type, + path=invoke_path, + outcome=invoke_outcome, + ) return _return_command_with_state_update(result, runtime.tool_call_id) async def atask( @@ -274,40 +354,104 @@ def build_task_tool_with_parent_config( # Prevent the parent's resume payload from leaking into subagent # interrupts via langgraph's parent_scratchpad fallback. drain_parent_null_resume(runtime) - try: - result = await subagent.ainvoke( - build_resume_command(resume_value, pending_id), - config=sub_config, - ) - except GraphInterrupt as gi: - ainvoke_outcome = "interrupted" - _perf_log.info( - "[hitl_route] atask EXIT subagent_type=%r path=%s outcome=%s " - "aget_state=%.3fs ainvoke=%.3fs total=%.3fs", - subagent_type, - invoke_path, - ainvoke_outcome, - aget_state_elapsed, - time.perf_counter() - ainvoke_start, - time.perf_counter() - atask_start, - ) - _reraise_stamped_subagent_interrupt(gi, runtime.tool_call_id) + with ot.subagent_invoke_span( + subagent_type=subagent_type, path=invoke_path + ) as sp: + try: + result = await subagent.ainvoke( + build_resume_command(resume_value, pending_id), + config=sub_config, + ) + sp.set_attribute("subagent.outcome", ainvoke_outcome) + except GraphInterrupt as gi: + ainvoke_outcome = "interrupted" + sp.set_attribute("subagent.outcome", ainvoke_outcome) + ot_metrics.record_subagent_invoke_duration( + (time.perf_counter() - ainvoke_start) * 1000, + subagent_type=subagent_type, + path=invoke_path, + outcome=ainvoke_outcome, + ) + ot_metrics.record_subagent_invoke_outcome( + subagent_type=subagent_type, + path=invoke_path, + outcome=ainvoke_outcome, + ) + _perf_log.info( + "[hitl_route] atask EXIT subagent_type=%r path=%s outcome=%s " + "aget_state=%.3fs ainvoke=%.3fs total=%.3fs", + subagent_type, + invoke_path, + ainvoke_outcome, + aget_state_elapsed, + time.perf_counter() - ainvoke_start, + time.perf_counter() - atask_start, + ) + _reraise_stamped_subagent_interrupt(gi, runtime.tool_call_id) + except Exception: + ainvoke_outcome = "error" + sp.set_attribute("subagent.outcome", ainvoke_outcome) + ot_metrics.record_subagent_invoke_duration( + (time.perf_counter() - ainvoke_start) * 1000, + subagent_type=subagent_type, + path=invoke_path, + outcome=ainvoke_outcome, + ) + ot_metrics.record_subagent_invoke_outcome( + subagent_type=subagent_type, + path=invoke_path, + outcome=ainvoke_outcome, + ) + raise else: - try: - result = await subagent.ainvoke(subagent_state, config=sub_config) - except GraphInterrupt as gi: - ainvoke_outcome = "interrupted" - _perf_log.info( - "[hitl_route] atask EXIT subagent_type=%r path=%s outcome=%s " - "aget_state=%.3fs ainvoke=%.3fs total=%.3fs", - subagent_type, - invoke_path, - ainvoke_outcome, - aget_state_elapsed, - time.perf_counter() - ainvoke_start, - time.perf_counter() - atask_start, - ) - _reraise_stamped_subagent_interrupt(gi, runtime.tool_call_id) + with ot.subagent_invoke_span( + subagent_type=subagent_type, path=invoke_path + ) as sp: + try: + result = await subagent.ainvoke( + subagent_state, config=sub_config + ) + sp.set_attribute("subagent.outcome", ainvoke_outcome) + except GraphInterrupt as gi: + ainvoke_outcome = "interrupted" + sp.set_attribute("subagent.outcome", ainvoke_outcome) + ot_metrics.record_subagent_invoke_duration( + (time.perf_counter() - ainvoke_start) * 1000, + subagent_type=subagent_type, + path=invoke_path, + outcome=ainvoke_outcome, + ) + ot_metrics.record_subagent_invoke_outcome( + subagent_type=subagent_type, + path=invoke_path, + outcome=ainvoke_outcome, + ) + _perf_log.info( + "[hitl_route] atask EXIT subagent_type=%r path=%s outcome=%s " + "aget_state=%.3fs ainvoke=%.3fs total=%.3fs", + subagent_type, + invoke_path, + ainvoke_outcome, + aget_state_elapsed, + time.perf_counter() - ainvoke_start, + time.perf_counter() - atask_start, + ) + _reraise_stamped_subagent_interrupt(gi, runtime.tool_call_id) + except Exception: + ainvoke_outcome = "error" + sp.set_attribute("subagent.outcome", ainvoke_outcome) + ot_metrics.record_subagent_invoke_duration( + (time.perf_counter() - ainvoke_start) * 1000, + subagent_type=subagent_type, + path=invoke_path, + outcome=ainvoke_outcome, + ) + ot_metrics.record_subagent_invoke_outcome( + subagent_type=subagent_type, + path=invoke_path, + outcome=ainvoke_outcome, + ) + raise ainvoke_elapsed = time.perf_counter() - ainvoke_start except GraphInterrupt: raise @@ -326,6 +470,17 @@ def build_task_tool_with_parent_config( merge_elapsed, time.perf_counter() - atask_start, ) + ot_metrics.record_subagent_invoke_duration( + ainvoke_elapsed * 1000, + subagent_type=subagent_type, + path=invoke_path, + outcome=ainvoke_outcome, + ) + ot_metrics.record_subagent_invoke_outcome( + subagent_type=subagent_type, + path=invoke_path, + outcome=ainvoke_outcome, + ) return cmd return StructuredTool.from_function(