Carry thinkingStepId on tool output and extend builder and parity tests.

This commit is contained in:
CREDO23 2026-05-08 23:17:12 +02:00
parent 32092c0b65
commit 1761b60c16
3 changed files with 156 additions and 1 deletions

View file

@ -110,7 +110,9 @@ def iter_tool_end_frames(
stream_result=result, stream_result=result,
langgraph_config=config, langgraph_config=config,
staged_workspace_file_path=staged_file_path, staged_workspace_file_path=staged_file_path,
tool_metadata=state.span_metadata_if_active(), tool_metadata=state.tool_activity_metadata(
thinking_step_id=original_step_id,
),
) )
yield from iter_tool_completion_emission_frames(emission_ctx) yield from iter_tool_completion_emission_frames(emission_ctx)

View file

@ -111,8 +111,10 @@ def test_complete_active_thinking_step_mirrors_closure_semantics() -> None:
svc = MagicMock() svc = MagicMock()
svc.format_thinking_step.return_value = "done-frame" svc.format_thinking_step.return_value = "done-frame"
completed: set[str] = set() completed: set[str] = set()
relay_state = AgentEventRelayState.for_invocation()
frame, new_id = complete_active_thinking_step( frame, new_id = complete_active_thinking_step(
state=relay_state,
streaming_service=svc, streaming_service=svc,
content_builder=None, content_builder=None,
last_active_step_id="thinking-1", last_active_step_id="thinking-1",
@ -125,6 +127,7 @@ def test_complete_active_thinking_step_mirrors_closure_semantics() -> None:
assert "thinking-1" in completed assert "thinking-1" in completed
frame2, id2 = complete_active_thinking_step( frame2, id2 = complete_active_thinking_step(
state=relay_state,
streaming_service=svc, streaming_service=svc,
content_builder=None, content_builder=None,
last_active_step_id="thinking-1", last_active_step_id="thinking-1",

View file

@ -15,6 +15,7 @@ import json
import pytest import pytest
from app.services.new_streaming_service import VercelStreamingService
from app.tasks.chat.content_builder import AssistantContentBuilder from app.tasks.chat.content_builder import AssistantContentBuilder
pytestmark = pytest.mark.unit pytestmark = pytest.mark.unit
@ -231,6 +232,155 @@ class TestToolHeavyTurn:
) )
# ---------------------------------------------------------------------------
# Task-span metadata on tool-call parts (JSONB persistence)
# ---------------------------------------------------------------------------
class TestToolCallSpanMetadata:
def test_input_available_merges_new_metadata_keys_after_start(self):
b = AssistantContentBuilder()
b.on_tool_input_start(
"call_t", "task", "lc_t", metadata={"spanId": "spn_1"}
)
b.on_tool_input_available(
"call_t",
"task",
{"goal": "x"},
"lc_t",
metadata={"traceId": "tr_1"},
)
part = b.snapshot()[0]
assert part["metadata"]["spanId"] == "spn_1"
assert part["metadata"]["traceId"] == "tr_1"
_assert_jsonb_safe(b.snapshot())
def test_input_available_does_not_overwrite_existing_metadata_keys(self):
b = AssistantContentBuilder()
b.on_tool_input_start(
"call_t", "task", "lc_t", metadata={"spanId": "spn_keep"}
)
b.on_tool_input_available(
"call_t", "task", {}, "lc_t", metadata={"spanId": "spn_other"}
)
assert b.snapshot()[0]["metadata"]["spanId"] == "spn_keep"
def test_late_tool_input_available_carries_metadata(self):
b = AssistantContentBuilder()
b.on_tool_input_available(
"call_l",
"grep",
{"pattern": "TODO"},
None,
metadata={"spanId": "spn_l"},
)
part = b.snapshot()[0]
assert part["metadata"] == {"spanId": "spn_l"}
_assert_jsonb_safe(b.snapshot())
def test_output_available_merges_without_clobbering_span_id(self):
b = AssistantContentBuilder()
b.on_tool_input_start("call_t", "ls", "lc", metadata={"spanId": "spn_x"})
b.on_tool_input_available("call_t", "ls", {"path": "/"}, "lc")
b.on_tool_output_available(
"call_t",
{"ok": True},
"lc",
metadata={"spanId": "spn_y", "extra": 1},
)
md = b.snapshot()[0]["metadata"]
assert md["spanId"] == "spn_x"
assert md["extra"] == 1
def test_output_available_adds_thinking_step_id_without_clobbering_span(self):
b = AssistantContentBuilder()
b.on_tool_input_start(
"call_t",
"ls",
"lc",
metadata={"spanId": "spn_x", "thinkingStepId": "thinking-3"},
)
b.on_tool_input_available("call_t", "ls", {"path": "/"}, "lc")
b.on_tool_output_available(
"call_t",
{"ok": True},
"lc",
metadata={"spanId": "spn_x", "thinkingStepId": "thinking-3"},
)
md = b.snapshot()[0]["metadata"]
assert md["spanId"] == "spn_x"
assert md["thinkingStepId"] == "thinking-3"
def test_output_available_with_none_metadata_preserves_prior(self):
b = AssistantContentBuilder()
b.on_tool_input_start("c", "ls", "lc", metadata={"spanId": "spn_1"})
b.on_tool_input_available("c", "ls", {}, "lc")
b.on_tool_output_available("c", {"r": 1}, "lc", metadata=None)
assert b.snapshot()[0]["metadata"] == {"spanId": "spn_1"}
def test_available_adds_thinking_step_id_after_chunk_only_start(self):
"""Mirrors chunk ``tool-input-start`` then ``on_tool_start`` ``available``."""
b = AssistantContentBuilder()
b.on_tool_input_start("lc_1", "ls", "lc_1", metadata={"spanId": "spn_a"})
b.on_tool_input_available(
"lc_1",
"ls",
{"path": "/"},
"lc_1",
metadata={"spanId": "spn_a", "thinkingStepId": "thinking-2"},
)
md = b.snapshot()[0]["metadata"]
assert md["spanId"] == "spn_a"
assert md["thinkingStepId"] == "thinking-2"
class TestVercelStreamingServiceToolMetadataWire:
"""SSE payloads include optional ``metadata`` for FE grouping."""
@staticmethod
def _parse_sse_data_line(raw: str) -> dict:
assert raw.startswith("data: ")
payload = raw.split("data: ", 1)[1].split("\n\n", 1)[0].strip()
return json.loads(payload)
def test_tool_input_available_includes_metadata_when_set(self):
svc = VercelStreamingService()
raw = svc.format_tool_input_available(
"id1",
"task",
{"a": 1},
langchain_tool_call_id="lc1",
metadata={"spanId": "spn_w", "thinkingStepId": "thinking-4"},
)
body = self._parse_sse_data_line(raw)
assert body["type"] == "tool-input-available"
assert body["metadata"] == {
"spanId": "spn_w",
"thinkingStepId": "thinking-4",
}
def test_tool_output_available_includes_metadata_when_set(self):
svc = VercelStreamingService()
raw = svc.format_tool_output_available(
"id1",
{"status": "completed"},
langchain_tool_call_id="lc1",
metadata={"spanId": "spn_o", "thinkingStepId": "thinking-9"},
)
body = self._parse_sse_data_line(raw)
assert body["type"] == "tool-output-available"
assert body["metadata"] == {
"spanId": "spn_o",
"thinkingStepId": "thinking-9",
}
def test_tool_input_available_omits_metadata_key_when_none(self):
svc = VercelStreamingService()
raw = svc.format_tool_input_available("id1", "ls", {})
body = self._parse_sse_data_line(raw)
assert "metadata" not in body
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Thinking steps & separators # Thinking steps & separators
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------