diff --git a/surfsense_backend/app/tasks/chat/streaming/handlers/tool_end.py b/surfsense_backend/app/tasks/chat/streaming/handlers/tool_end.py index ec7d6551c..421c67a6d 100644 --- a/surfsense_backend/app/tasks/chat/streaming/handlers/tool_end.py +++ b/surfsense_backend/app/tasks/chat/streaming/handlers/tool_end.py @@ -110,7 +110,9 @@ def iter_tool_end_frames( stream_result=result, langgraph_config=config, staged_workspace_file_path=staged_file_path, - tool_metadata=state.span_metadata_if_active(), + tool_metadata=state.tool_activity_metadata( + thinking_step_id=original_step_id, + ), ) yield from iter_tool_completion_emission_frames(emission_ctx) diff --git a/surfsense_backend/tests/unit/tasks/chat/streaming/test_stage_2_parity.py b/surfsense_backend/tests/unit/tasks/chat/streaming/test_stage_2_parity.py index 9ae7defec..3ee1ab622 100644 --- a/surfsense_backend/tests/unit/tasks/chat/streaming/test_stage_2_parity.py +++ b/surfsense_backend/tests/unit/tasks/chat/streaming/test_stage_2_parity.py @@ -111,8 +111,10 @@ def test_complete_active_thinking_step_mirrors_closure_semantics() -> None: svc = MagicMock() svc.format_thinking_step.return_value = "done-frame" completed: set[str] = set() + relay_state = AgentEventRelayState.for_invocation() frame, new_id = complete_active_thinking_step( + state=relay_state, streaming_service=svc, content_builder=None, last_active_step_id="thinking-1", @@ -125,6 +127,7 @@ def test_complete_active_thinking_step_mirrors_closure_semantics() -> None: assert "thinking-1" in completed frame2, id2 = complete_active_thinking_step( + state=relay_state, streaming_service=svc, content_builder=None, last_active_step_id="thinking-1", diff --git a/surfsense_backend/tests/unit/tasks/chat/test_content_builder.py b/surfsense_backend/tests/unit/tasks/chat/test_content_builder.py index 4b1fadd9c..9d3eb6fa4 100644 --- a/surfsense_backend/tests/unit/tasks/chat/test_content_builder.py +++ b/surfsense_backend/tests/unit/tasks/chat/test_content_builder.py @@ -15,6 +15,7 @@ import json import pytest +from app.services.new_streaming_service import VercelStreamingService from app.tasks.chat.content_builder import AssistantContentBuilder pytestmark = pytest.mark.unit @@ -231,6 +232,155 @@ class TestToolHeavyTurn: ) +# --------------------------------------------------------------------------- +# Task-span metadata on tool-call parts (JSONB persistence) +# --------------------------------------------------------------------------- + + +class TestToolCallSpanMetadata: + def test_input_available_merges_new_metadata_keys_after_start(self): + b = AssistantContentBuilder() + b.on_tool_input_start( + "call_t", "task", "lc_t", metadata={"spanId": "spn_1"} + ) + b.on_tool_input_available( + "call_t", + "task", + {"goal": "x"}, + "lc_t", + metadata={"traceId": "tr_1"}, + ) + part = b.snapshot()[0] + assert part["metadata"]["spanId"] == "spn_1" + assert part["metadata"]["traceId"] == "tr_1" + _assert_jsonb_safe(b.snapshot()) + + def test_input_available_does_not_overwrite_existing_metadata_keys(self): + b = AssistantContentBuilder() + b.on_tool_input_start( + "call_t", "task", "lc_t", metadata={"spanId": "spn_keep"} + ) + b.on_tool_input_available( + "call_t", "task", {}, "lc_t", metadata={"spanId": "spn_other"} + ) + assert b.snapshot()[0]["metadata"]["spanId"] == "spn_keep" + + def test_late_tool_input_available_carries_metadata(self): + b = AssistantContentBuilder() + b.on_tool_input_available( + "call_l", + "grep", + {"pattern": "TODO"}, + None, + metadata={"spanId": "spn_l"}, + ) + part = b.snapshot()[0] + assert part["metadata"] == {"spanId": "spn_l"} + _assert_jsonb_safe(b.snapshot()) + + def test_output_available_merges_without_clobbering_span_id(self): + b = AssistantContentBuilder() + b.on_tool_input_start("call_t", "ls", "lc", metadata={"spanId": "spn_x"}) + b.on_tool_input_available("call_t", "ls", {"path": "/"}, "lc") + b.on_tool_output_available( + "call_t", + {"ok": True}, + "lc", + metadata={"spanId": "spn_y", "extra": 1}, + ) + md = b.snapshot()[0]["metadata"] + assert md["spanId"] == "spn_x" + assert md["extra"] == 1 + + def test_output_available_adds_thinking_step_id_without_clobbering_span(self): + b = AssistantContentBuilder() + b.on_tool_input_start( + "call_t", + "ls", + "lc", + metadata={"spanId": "spn_x", "thinkingStepId": "thinking-3"}, + ) + b.on_tool_input_available("call_t", "ls", {"path": "/"}, "lc") + b.on_tool_output_available( + "call_t", + {"ok": True}, + "lc", + metadata={"spanId": "spn_x", "thinkingStepId": "thinking-3"}, + ) + md = b.snapshot()[0]["metadata"] + assert md["spanId"] == "spn_x" + assert md["thinkingStepId"] == "thinking-3" + + def test_output_available_with_none_metadata_preserves_prior(self): + b = AssistantContentBuilder() + b.on_tool_input_start("c", "ls", "lc", metadata={"spanId": "spn_1"}) + b.on_tool_input_available("c", "ls", {}, "lc") + b.on_tool_output_available("c", {"r": 1}, "lc", metadata=None) + assert b.snapshot()[0]["metadata"] == {"spanId": "spn_1"} + + def test_available_adds_thinking_step_id_after_chunk_only_start(self): + """Mirrors chunk ``tool-input-start`` then ``on_tool_start`` ``available``.""" + b = AssistantContentBuilder() + b.on_tool_input_start("lc_1", "ls", "lc_1", metadata={"spanId": "spn_a"}) + b.on_tool_input_available( + "lc_1", + "ls", + {"path": "/"}, + "lc_1", + metadata={"spanId": "spn_a", "thinkingStepId": "thinking-2"}, + ) + md = b.snapshot()[0]["metadata"] + assert md["spanId"] == "spn_a" + assert md["thinkingStepId"] == "thinking-2" + + +class TestVercelStreamingServiceToolMetadataWire: + """SSE payloads include optional ``metadata`` for FE grouping.""" + + @staticmethod + def _parse_sse_data_line(raw: str) -> dict: + assert raw.startswith("data: ") + payload = raw.split("data: ", 1)[1].split("\n\n", 1)[0].strip() + return json.loads(payload) + + def test_tool_input_available_includes_metadata_when_set(self): + svc = VercelStreamingService() + raw = svc.format_tool_input_available( + "id1", + "task", + {"a": 1}, + langchain_tool_call_id="lc1", + metadata={"spanId": "spn_w", "thinkingStepId": "thinking-4"}, + ) + body = self._parse_sse_data_line(raw) + assert body["type"] == "tool-input-available" + assert body["metadata"] == { + "spanId": "spn_w", + "thinkingStepId": "thinking-4", + } + + def test_tool_output_available_includes_metadata_when_set(self): + svc = VercelStreamingService() + raw = svc.format_tool_output_available( + "id1", + {"status": "completed"}, + langchain_tool_call_id="lc1", + metadata={"spanId": "spn_o", "thinkingStepId": "thinking-9"}, + ) + body = self._parse_sse_data_line(raw) + assert body["type"] == "tool-output-available" + assert body["metadata"] == { + "spanId": "spn_o", + "thinkingStepId": "thinking-9", + } + + def test_tool_input_available_omits_metadata_key_when_none(self): + svc = VercelStreamingService() + raw = svc.format_tool_input_available("id1", "ls", {}) + body = self._parse_sse_data_line(raw) + assert "metadata" not in body + + # --------------------------------------------------------------------------- # Thinking steps & separators # ---------------------------------------------------------------------------