From 67479e98fd7662fd535e30c80eaf1f055146fc82 Mon Sep 17 00:00:00 2001 From: Mohamed-Mamdouh Date: Thu, 21 May 2026 03:13:50 +0100 Subject: [PATCH] fix timestamps in tuner accumelator (#335) * fix timestamps in tuner accumelator * chore: refactor strip_thought_ids_from_messages --------- Co-authored-by: Abhishek Kumar --- api/services/integrations/tuner/collector.py | 13 +++++- api/tests/test_message_sanitization.py | 42 ++++++++++++++++++++ pipecat | 2 +- 3 files changed, 54 insertions(+), 3 deletions(-) create mode 100644 api/tests/test_message_sanitization.py diff --git a/api/services/integrations/tuner/collector.py b/api/services/integrations/tuner/collector.py index e55ab85..73dd410 100644 --- a/api/services/integrations/tuner/collector.py +++ b/api/services/integrations/tuner/collector.py @@ -23,6 +23,7 @@ from pipecat.observers.base_observer import BaseObserver, FramePushed from pipecat.observers.turn_tracking_observer import TurnTrackingObserver from pipecat.observers.user_bot_latency_observer import UserBotLatencyObserver from pipecat.processors.frame_processor import FrameDirection +from pipecat.utils.context.message_sanitization import strip_thought_ids_from_messages from tuner_pipecat_sdk.accumulator import CallAccumulator from tuner_pipecat_sdk.payload_builder import build_payload @@ -76,6 +77,7 @@ class TunerCollector(BaseObserver): self._agent_version = agent_version self._acc = CallAccumulator() self._acc.call_start_abs_ns = time.time_ns() + self._pipeline_start_rel_ns: int | None = None self._context_provider: Callable[[], list[dict[str, Any]]] | None = None self._processed_frames: set[int] = set() self._frame_history: deque[int] = deque(maxlen=max_frames) @@ -130,7 +132,14 @@ class TunerCollector(BaseObserver): self._processed_frames = set(self._frame_history) frame = data.frame - timestamp_ns = data.timestamp + + # data.timestamp is a pipeline-relative clock (ns since pipeline start). + # Convert to absolute ns so the accumulator's _rel_ms() works correctly. + if self._pipeline_start_rel_ns is None: + self._pipeline_start_rel_ns = data.timestamp + timestamp_ns = self._acc.call_start_abs_ns + ( + data.timestamp - self._pipeline_start_rel_ns + ) if isinstance(frame, StartFrame): self._acc.on_start(timestamp_ns) @@ -165,7 +174,7 @@ class TunerCollector(BaseObserver): ) return None - transcript = list(self._context_provider()) + transcript = strip_thought_ids_from_messages(list(self._context_provider())) payload = build_payload( self._acc, _PayloadConfig( diff --git a/api/tests/test_message_sanitization.py b/api/tests/test_message_sanitization.py new file mode 100644 index 0000000..2d2ef66 --- /dev/null +++ b/api/tests/test_message_sanitization.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +from copy import deepcopy + +from pipecat.utils.context.message_sanitization import ( + strip_thought_from_id, + strip_thought_ids_from_messages, +) + + +def test_strip_thought_from_id(): + assert strip_thought_from_id("call_123__thought__abc") == "call_123" + assert strip_thought_from_id("call_123") == "call_123" + assert strip_thought_from_id(None) is None + + +def test_strip_thought_ids_from_messages_does_not_mutate_input(): + messages = [ + { + "role": "assistant", + "tool_calls": [ + { + "id": "call_1__thought__hidden", + "type": "function", + "function": {"name": "lookup", "arguments": "{}"}, + } + ], + }, + { + "role": "tool", + "tool_call_id": "call_1__thought__hidden", + "content": '{"status":"ok"}', + }, + ] + original = deepcopy(messages) + + cleaned = strip_thought_ids_from_messages(messages) + + assert messages == original + assert cleaned is not messages + assert cleaned[0]["tool_calls"][0]["id"] == "call_1" + assert cleaned[1]["tool_call_id"] == "call_1" diff --git a/pipecat b/pipecat index ce4ee2d..6b4474c 160000 --- a/pipecat +++ b/pipecat @@ -1 +1 @@ -Subproject commit ce4ee2d6fc0d0982ad03a1468a517cc5e568aaa9 +Subproject commit 6b4474c1b870eae5e42ef7c5eec1b7f37fcecc61