From 0c0b8383bf3e230bdf4f49934eb7bf8fc9bc5510 Mon Sep 17 00:00:00 2001 From: Abhishek Kumar Date: Sun, 31 May 2026 16:05:03 +0530 Subject: [PATCH] fix: fix rtf logs and gemini live turn taking --- api/services/pipecat/realtime/gemini_live.py | 37 ----- .../pipecat/realtime/openai_realtime.py | 10 +- .../pipecat/realtime_feedback_observer.py | 134 ++++-------------- ...test_gemini_live_reconnect_tool_results.py | 24 ++++ api/tests/test_realtime_feedback_observer.py | 100 +++++++++++++ pipecat | 2 +- 6 files changed, 159 insertions(+), 148 deletions(-) create mode 100644 api/tests/test_realtime_feedback_observer.py diff --git a/api/services/pipecat/realtime/gemini_live.py b/api/services/pipecat/realtime/gemini_live.py index abcc3ea..aba4880 100644 --- a/api/services/pipecat/realtime/gemini_live.py +++ b/api/services/pipecat/realtime/gemini_live.py @@ -16,9 +16,6 @@ Layers Dograh engine integration quirks onto upstream-pristine - **TTSSpeakFrame as greeting trigger.** The engine queues a TTSSpeakFrame to kick off the first response after node setup; the service intercepts it and runs the initial-context path. -- **Finalize-pending on transcriptions.** Marks the transcription emitted - immediately after VAD-stop as finalized, distinguishing it from - mid-turn partials. """ from typing import Any @@ -28,7 +25,6 @@ from loguru import logger from pipecat.frames.frames import ( BotStoppedSpeakingFrame, Frame, - TranscriptionFrame, TTSSpeakFrame, UserMuteStartedFrame, UserMuteStoppedFrame, @@ -37,7 +33,6 @@ from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.frame_processor import FrameDirection from pipecat.services.google.gemini_live.llm import GeminiLiveLLMService from pipecat.services.llm_service import FunctionCallFromLLM -from pipecat.utils.time import time_now_iso8601 from pipecat.utils.tracing.service_decorators import traced_gemini_live @@ -58,9 +53,6 @@ class DograhGeminiLiveLLMService(GeminiLiveLLMService): # Function calls emitted by Gemini mid-bot-turn are deferred here and # invoked when the turn ends, so they don't race the turn's audio. self._pending_function_calls: list[FunctionCallFromLLM] = [] - # Tracks whether the next transcription to arrive should be marked as - # the finalized transcription for the current user turn. - self._finalize_pending: bool = False # ------------------------------------------------------------------ # Hooks from upstream GeminiLiveLLMService @@ -206,32 +198,3 @@ class DograhGeminiLiveLLMService(GeminiLiveLLMService): # a handle (e.g. node transitions before any handle was issued) are # followed by a function-call-result LLMContextFrame which feeds the # updated-context branch in _handle_context. - - # ------------------------------------------------------------------ - # Transcription: broadcast (so downstream voicemail detector and - # logs buffer both see it) and set finalized= for turn-boundary - # semantics. - # ------------------------------------------------------------------ - - async def _handle_user_started_speaking(self, frame): - await super()._handle_user_started_speaking(frame) - # A new VAD start invalidates any pending finalize from a prior stop - # that hasn't been paired with a transcription yet. - self._finalize_pending = False - - async def _handle_user_stopped_speaking(self, frame): - await super()._handle_user_stopped_speaking(frame) - self._finalize_pending = True - - async def _push_user_transcription(self, text: str, result=None): - await self._handle_user_transcription(text, True, self._settings.language) - finalized = self._finalize_pending - self._finalize_pending = False - await self.broadcast_frame( - TranscriptionFrame, - text=text, - user_id="", - timestamp=time_now_iso8601(), - result=result, - finalized=finalized, - ) diff --git a/api/services/pipecat/realtime/openai_realtime.py b/api/services/pipecat/realtime/openai_realtime.py index 8822c06..a23ba29 100644 --- a/api/services/pipecat/realtime/openai_realtime.py +++ b/api/services/pipecat/realtime/openai_realtime.py @@ -13,9 +13,8 @@ Adds: flow kicks off the bot's first response. - **One-off LLMMessagesAppendFrame handling** for ephemeral realtime prompts like user-idle checks, without mutating Dograh's local ``LLMContext``. -- **finalized=True on TranscriptionFrame** for parity with the Gemini - service (every OpenAI transcription via the ``completed`` event is - final by construction). +- **finalized=True on TranscriptionFrame** because every OpenAI + transcription via the ``completed`` event is final by construction. """ import json @@ -254,9 +253,8 @@ class DograhOpenAIRealtimeLLMService(OpenAIRealtimeLLMService): logger.error(f"Failed to process function call arguments: {e}") # ------------------------------------------------------------------ - # Transcription: broadcast with finalized=True for parity with the - # Gemini service (consumers that check `finalized` should see True - # for every completed-transcription event from OpenAI). + # Transcription: broadcast with finalized=True for every + # completed-transcription event from OpenAI. # ------------------------------------------------------------------ async def handle_evt_input_audio_transcription_completed(self, evt): diff --git a/api/services/pipecat/realtime_feedback_observer.py b/api/services/pipecat/realtime_feedback_observer.py index 3cc85c6..8db778c 100644 --- a/api/services/pipecat/realtime_feedback_observer.py +++ b/api/services/pipecat/realtime_feedback_observer.py @@ -4,9 +4,9 @@ This observer watches pipeline frames and sends relevant events (transcriptions, bot text, function calls, TTFB metrics) over WebSocket to provide real-time feedback in the UI. -For frames with presentation timestamps (pts), like TTSTextFrame, we respect -the timing by queuing them and sending at the appropriate time, similar to -how base_output.py handles timed frames. +For TTS text, we wait until the frame has passed through BaseOutputTransport. +That transport already applies presentation timestamp timing against audio +playback, so the UI text is emitted from the same clock as the spoken audio. Streaming vs. persisted data: - WebSocket receives all events in real-time (interim transcriptions, TTS text @@ -20,9 +20,7 @@ rather than being observed here, to ensure precise timing at the moment of node changes. """ -import asyncio import json -import time from typing import TYPE_CHECKING, Awaitable, Callable, Optional, Set from loguru import logger @@ -60,8 +58,8 @@ from pipecat.frames.frames import ( from pipecat.metrics.metrics import TTFBMetricsData from pipecat.observers.base_observer import BaseObserver, FramePushed from pipecat.processors.frame_processor import FrameDirection +from pipecat.transports.base_output import BaseOutputTransport from pipecat.utils.enums import RealtimeFeedbackType -from pipecat.utils.time import nanoseconds_to_seconds class RealtimeFeedbackObserver(BaseObserver): @@ -69,7 +67,7 @@ class RealtimeFeedbackObserver(BaseObserver): WebSocket streaming (all events for live UI): - User transcriptions (interim and final) - - Bot TTS text (with pts-based timing) + - Bot TTS text after output transport timing - Function calls (start/end) - TTFB metrics (LLM generation time only) @@ -78,9 +76,6 @@ class RealtimeFeedbackObserver(BaseObserver): - Complete assistant transcripts per turn (via on_assistant_turn_stopped) - Function calls and TTFB metrics - For frames with pts (presentation timestamp), we queue them and send at the - appropriate time to sync with audio playback. - Note: Node transitions are handled by PipecatEngine.set_node() callback. """ @@ -100,105 +95,47 @@ class RealtimeFeedbackObserver(BaseObserver): self._logs_buffer = logs_buffer self._frames_seen: Set[str] = set() - # Clock/timing for pts-based frames (similar to base_output.py) - self._clock_queue: Optional[asyncio.PriorityQueue] = None - self._clock_task: Optional[asyncio.Task] = None - self._clock_start_time: Optional[float] = ( - None # Wall clock time when we started - ) - self._pts_start_time: Optional[int] = None # First pts value we saw - - async def _ensure_clock_task(self): - """Create the clock task if it doesn't exist.""" - if self._clock_queue is None: - self._clock_queue = asyncio.PriorityQueue() - self._clock_task = asyncio.create_task(self._clock_task_handler()) - - async def _cancel_clock_task(self): - """Cancel the clock task and clear the queue. - - Called on interruption to discard any pending bot text that - hasn't been sent yet. - """ - if self._clock_task: - self._clock_task.cancel() - try: - await self._clock_task - except asyncio.CancelledError: - pass - self._clock_task = None - self._clock_queue = None - # Reset timing references so next bot response starts fresh - self._clock_start_time = None - self._pts_start_time = None - async def cleanup(self): """Clean up resources. Must be called when the observer is no longer needed.""" - await self._cancel_clock_task() - - async def _handle_interruption(self): - """Handle interruption by clearing queued bot text. - - Similar to base_output.py's handle_interruptions, we cancel the - clock task and recreate it to discard pending frames. - """ - await self._cancel_clock_task() - - async def _clock_task_handler(self): - """Process timed frames from the queue, respecting their presentation timestamps. - - Similar to base_output.py's _clock_task_handler, we wait until the - frame's pts time has arrived before sending. - """ - while True: - try: - pts, _frame_id, message = await self._clock_queue.get() - - # Calculate when to send based on pts relative to our start time - if ( - self._clock_start_time is not None - and self._pts_start_time is not None - ): - # Target time = start wall time + (frame pts - start pts) in seconds - target_time = self._clock_start_time + nanoseconds_to_seconds( - pts - self._pts_start_time - ) - current_time = time.time() - if target_time > current_time: - await asyncio.sleep(target_time - current_time) - - # Send the message (clock queue only has TTS text, WS-only) - await self._send_ws(message) - self._clock_queue.task_done() - except asyncio.CancelledError: - break - except Exception as e: - logger.debug(f"Clock task error: {e}") + pass async def on_push_frame(self, data: FramePushed): """Process frames and send relevant ones to the client.""" frame = data.frame frame_direction = data.direction + source = data.source # Skip already processed frames (frames can be observed multiple times). # ErrorFrames are accepted in either direction — push_error() emits them - # UPSTREAM, and we still want to surface them to the UI. + # UPSTREAM, and we still want to surface them to the UI. Upstream-only + # transcription frames are accepted too: upstream Gemini Live emits user + # transcripts toward the user aggregator, not downstream. Broadcast + # transcription siblings are still handled only on the downstream copy to + # avoid duplicate live UI messages. if frame.id in self._frames_seen: return - if frame_direction != FrameDirection.DOWNSTREAM and not isinstance( - frame, ErrorFrame + if frame_direction != FrameDirection.DOWNSTREAM: + is_upstream_transcription = ( + isinstance(frame, (InterimTranscriptionFrame, TranscriptionFrame)) + and frame.broadcast_sibling_id is None + ) + if not isinstance(frame, ErrorFrame) and not is_upstream_transcription: + return + + # TTSTextFrame may be observed before the output transport has applied + # its audio clock. Match RTVIObserver: leave the frame unmarked so the + # transport-pushed copy can be handled with playback timing already done. + if isinstance(frame, TTSTextFrame) and not isinstance( + source, BaseOutputTransport ): return + self._frames_seen.add(frame.id) logger.trace(f"{self} Received Frame: {frame} Direction: {frame_direction}") - # Handle pipeline termination - stop clock task - if isinstance(frame, (EndFrame, CancelFrame, StopFrame)): - await self._cancel_clock_task() - # Handle interruptions - clear any queued bot text - elif isinstance(frame, InterruptionFrame): - await self._handle_interruption() + if isinstance(frame, (EndFrame, CancelFrame, StopFrame, InterruptionFrame)): + return # Bot speaking state - WS only (ephemeral state signals, not persisted) elif isinstance(frame, BotStartedSpeakingFrame): await self._send_ws( @@ -245,27 +182,16 @@ class RealtimeFeedbackObserver(BaseObserver): elif isinstance(frame, TTSSpeakFrame): if getattr(frame, "persist_to_logs", False): await self._append_to_buffer(build_bot_text_event(text=frame.text)) - # Handle bot TTS text - respect pts timing, WebSocket only + # Handle bot TTS text after output transport timing, WebSocket only # Complete turn text is persisted via register_turn_handlers, # except for frames explicitly flagged persist_to_logs (e.g. recording # transcripts from play_audio) which bypass the aggregator path. elif isinstance(frame, TTSTextFrame): message = build_bot_text_event(text=frame.text) - # If frame has pts, queue it for timed delivery - if frame.pts: - # Initialize timing reference on first pts frame - if self._pts_start_time is None: - self._pts_start_time = frame.pts - self._clock_start_time = time.time() - - await self._ensure_clock_task() - await self._clock_queue.put((frame.pts, frame.id, message)) - elif getattr(frame, "persist_to_logs", False): - # No pts + explicit persistence request (recording transcript). + if getattr(frame, "persist_to_logs", False): await self._send_message(message) else: - # No pts, send immediately await self._send_ws(message) # Handle function call in progress elif ( diff --git a/api/tests/test_gemini_live_reconnect_tool_results.py b/api/tests/test_gemini_live_reconnect_tool_results.py index 1ad0670..8adb1e1 100644 --- a/api/tests/test_gemini_live_reconnect_tool_results.py +++ b/api/tests/test_gemini_live_reconnect_tool_results.py @@ -3,7 +3,9 @@ from types import SimpleNamespace from unittest.mock import AsyncMock import pytest +from pipecat.frames.frames import TranscriptionFrame from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.frame_processor import FrameDirection from api.services.pipecat.realtime.gemini_live import DograhGeminiLiveLLMService @@ -84,3 +86,25 @@ async def test_disconnect_does_not_forget_previously_delivered_tool_results(): service._tool_result.assert_not_awaited() assert service._completed_tool_calls == {"call-transition"} + + +@pytest.mark.asyncio +async def test_user_transcription_matches_upstream_upstream_push_behavior(): + service = _make_service() + service._handle_user_transcription = AsyncMock() + service.push_frame = AsyncMock() + service.broadcast_frame = AsyncMock() + + await service._push_user_transcription("Hi there") + + service._handle_user_transcription.assert_awaited_once_with( + "Hi there", True, service._settings.language + ) + service.broadcast_frame.assert_not_awaited() + service.push_frame.assert_awaited_once() + + frame, direction = service.push_frame.await_args.args + assert isinstance(frame, TranscriptionFrame) + assert frame.text == "Hi there" + assert frame.finalized is False + assert direction == FrameDirection.UPSTREAM diff --git a/api/tests/test_realtime_feedback_observer.py b/api/tests/test_realtime_feedback_observer.py new file mode 100644 index 0000000..967f1a0 --- /dev/null +++ b/api/tests/test_realtime_feedback_observer.py @@ -0,0 +1,100 @@ +from types import SimpleNamespace + +import pytest +from pipecat.frames.frames import TranscriptionFrame, TTSTextFrame +from pipecat.observers.base_observer import FramePushed +from pipecat.processors.frame_processor import FrameDirection +from pipecat.transports.base_output import BaseOutputTransport +from pipecat.transports.base_transport import TransportParams + +from api.services.pipecat.realtime_feedback_observer import RealtimeFeedbackObserver + + +def _frame_pushed(frame, direction, *, source=None): + return FramePushed( + source=source or SimpleNamespace(), + destination=SimpleNamespace(), + frame=frame, + direction=direction, + timestamp=0, + ) + + +@pytest.mark.asyncio +async def test_observer_streams_upstream_only_transcription_frames(): + messages = [] + + async def ws_sender(message): + messages.append(message) + + observer = RealtimeFeedbackObserver(ws_sender=ws_sender) + frame = TranscriptionFrame( + "Hi there", + user_id="user-1", + timestamp="2026-01-01T00:00:00+00:00", + ) + + await observer.on_push_frame(_frame_pushed(frame, FrameDirection.UPSTREAM)) + + assert messages == [ + { + "type": "rtf-user-transcription", + "payload": { + "text": "Hi there", + "final": True, + "timestamp": "2026-01-01T00:00:00+00:00", + "user_id": "user-1", + }, + } + ] + + +@pytest.mark.asyncio +async def test_observer_ignores_upstream_broadcast_transcription_sibling(): + messages = [] + + async def ws_sender(message): + messages.append(message) + + observer = RealtimeFeedbackObserver(ws_sender=ws_sender) + frame = TranscriptionFrame( + "Hi there", + user_id="user-1", + timestamp="2026-01-01T00:00:00+00:00", + ) + frame.broadcast_sibling_id = 1234 + + await observer.on_push_frame(_frame_pushed(frame, FrameDirection.UPSTREAM)) + + assert messages == [] + + +@pytest.mark.asyncio +async def test_observer_waits_for_tts_text_from_output_transport(): + messages = [] + + async def ws_sender(message): + messages.append(message) + + observer = RealtimeFeedbackObserver(ws_sender=ws_sender) + frame = TTSTextFrame("Hello", aggregated_by="word") + frame.pts = 123 + + await observer.on_push_frame(_frame_pushed(frame, FrameDirection.DOWNSTREAM)) + assert messages == [] + + output_transport = BaseOutputTransport(TransportParams()) + await observer.on_push_frame( + _frame_pushed( + frame, + FrameDirection.DOWNSTREAM, + source=output_transport, + ) + ) + + assert messages == [ + { + "type": "rtf-bot-text", + "payload": {"text": "Hello"}, + } + ] diff --git a/pipecat b/pipecat index 6e410e0..228324a 160000 --- a/pipecat +++ b/pipecat @@ -1 +1 @@ -Subproject commit 6e410e06cc9f71fbadfb3850f87848d2288d6651 +Subproject commit 228324a146a6765c6b8d610963bc80d7bc8cb9f7