From d37d6d05c1d353368476f94fa13d2817adad46f4 Mon Sep 17 00:00:00 2001 From: Abhishek Kumar Date: Sat, 16 May 2026 16:53:55 +0530 Subject: [PATCH] fix: resample telephony audio for openai realtime --- .../pipecat/realtime/openai_realtime.py | 97 +++++++++++++++++++ api/tests/test_realtime_message_append.py | 59 +++++++++++ pipecat | 2 +- 3 files changed, 157 insertions(+), 1 deletion(-) create mode 100644 api/tests/test_realtime_message_append.py diff --git a/api/services/pipecat/realtime/openai_realtime.py b/api/services/pipecat/realtime/openai_realtime.py index a01c077..8822c06 100644 --- a/api/services/pipecat/realtime/openai_realtime.py +++ b/api/services/pipecat/realtime/openai_realtime.py @@ -11,12 +11,15 @@ Adds: - **User-mute audio gating** via ``UserMuteStarted/StoppedFrame``. - **TTSSpeakFrame as initial-response trigger** so the engine's greeting flow kicks off the bot's first response. +- **One-off LLMMessagesAppendFrame handling** for ephemeral realtime prompts + like user-idle checks, without mutating Dograh's local ``LLMContext``. - **finalized=True on TranscriptionFrame** for parity with the Gemini service (every OpenAI transcription via the ``completed`` event is final by construction). """ import json +from typing import Any from loguru import logger @@ -24,6 +27,8 @@ from pipecat.frames.frames import ( BotStartedSpeakingFrame, BotStoppedSpeakingFrame, Frame, + LLMFullResponseStartFrame, + LLMMessagesAppendFrame, TranscriptionFrame, TTSSpeakFrame, UserMuteStartedFrame, @@ -32,6 +37,7 @@ from pipecat.frames.frames import ( from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.frame_processor import FrameDirection from pipecat.services.llm_service import FunctionCallFromLLM +from pipecat.services.openai.realtime import events from pipecat.services.openai.realtime.llm import OpenAIRealtimeLLMService from pipecat.transcriptions.language import Language from pipecat.utils.time import time_now_iso8601 @@ -81,6 +87,9 @@ class DograhOpenAIRealtimeLLMService(OpenAIRealtimeLLMService): # Don't forward the frame; the audio path is owned by the realtime # service itself. return + if isinstance(frame, LLMMessagesAppendFrame): + await self._handle_messages_append(frame) + return if isinstance(frame, BotStartedSpeakingFrame): self._bot_is_speaking = True elif isinstance(frame, BotStoppedSpeakingFrame): @@ -88,6 +97,33 @@ class DograhOpenAIRealtimeLLMService(OpenAIRealtimeLLMService): await self._run_pending_function_calls() await super().process_frame(frame, direction) + async def _handle_messages_append(self, frame: LLMMessagesAppendFrame): + """Consume a one-off append frame without mutating the local LLMContext.""" + if self._disconnecting: + return + + if not self._api_session_ready: + if frame.run_llm: + logger.debug( + f"{self}: LLMMessagesAppendFrame received before session ready; " + "deferring response until the session is initialized" + ) + self._run_llm_when_api_session_ready = True + return + + appended_any = False + for message in frame.messages: + item = self._message_to_conversation_item(message) + if item is None: + continue + evt = events.ConversationItemCreateEvent(item=item) + self._messages_added_manually[evt.item.id] = True + await self.send_client_event(evt) + appended_any = True + + if frame.run_llm and appended_any: + await self._send_manual_response_create() + async def _handle_context(self, context: LLMContext): if not self._handled_initial_context: if context is None: @@ -107,6 +143,67 @@ class DograhOpenAIRealtimeLLMService(OpenAIRealtimeLLMService): return await super()._send_user_audio(frame) + def _message_to_conversation_item( + self, message: dict[str, Any] + ) -> events.ConversationItem | None: + if not isinstance(message, dict): + logger.warning( + f"{self}: skipping unsupported appended message payload {message!r}" + ) + return None + + role = message.get("role") + if role not in {"user", "system", "developer"}: + logger.warning( + f"{self}: skipping unsupported appended message role {role!r}" + ) + return None + + text = self._extract_text_content(message.get("content")) + if not text: + logger.warning( + f"{self}: skipping appended message with unsupported content {message!r}" + ) + return None + + item_role = "system" if role in {"system", "developer"} else "user" + return events.ConversationItem( + type="message", + role=item_role, + content=[events.ItemContent(type="input_text", text=text)], + ) + + @staticmethod + def _extract_text_content(content: Any) -> str | None: + if isinstance(content, str): + return content + if isinstance(content, list): + parts: list[str] = [] + for part in content: + if not isinstance(part, dict): + return None + if part.get("type") != "text": + return None + text = part.get("text") + if not isinstance(text, str): + return None + parts.append(text) + return "\n".join(parts) if parts else None + return None + + async def _send_manual_response_create(self): + """Trigger inference after manually appending conversation items.""" + await self.push_frame(LLMFullResponseStartFrame()) + await self.start_processing_metrics() + await self.start_ttfb_metrics() + await self.send_client_event( + events.ResponseCreateEvent( + response=events.ResponseProperties( + output_modalities=self._get_enabled_modalities() + ) + ) + ) + async def _run_pending_function_calls(self): if not self._deferred_function_calls: return diff --git a/api/tests/test_realtime_message_append.py b/api/tests/test_realtime_message_append.py new file mode 100644 index 0000000..56ec892 --- /dev/null +++ b/api/tests/test_realtime_message_append.py @@ -0,0 +1,59 @@ +from types import SimpleNamespace +from unittest.mock import AsyncMock + +import pytest +from pipecat.frames.frames import LLMMessagesAppendFrame +from pipecat.services.openai.realtime import events + +from api.services.pipecat.realtime.openai_realtime import ( + DograhOpenAIRealtimeLLMService, +) +from api.services.workflow.pipecat_engine_callbacks import UserIdleHandler + + +@pytest.mark.asyncio +async def test_openai_realtime_messages_append_frame_sends_conversation_item(): + service = DograhOpenAIRealtimeLLMService(api_key="test") + service._api_session_ready = True + service.send_client_event = AsyncMock() + service._send_manual_response_create = AsyncMock() + + await service._handle_messages_append( + LLMMessagesAppendFrame( + [{"role": "user", "content": "Are you still there?"}], + run_llm=True, + ) + ) + + service.send_client_event.assert_awaited_once() + event = service.send_client_event.await_args.args[0] + assert isinstance(event, events.ConversationItemCreateEvent) + assert event.item.role == "user" + assert event.item.type == "message" + assert event.item.content == [ + events.ItemContent(type="input_text", text="Are you still there?") + ] + service._send_manual_response_create.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_user_idle_handler_uses_realtime_append_path(): + engine = SimpleNamespace( + llm=SimpleNamespace(), + end_call_with_reason=AsyncMock(), + ) + aggregator = SimpleNamespace(push_frame=AsyncMock()) + handler = UserIdleHandler(engine) + + await handler.handle_idle(aggregator) + + aggregator.push_frame.assert_awaited_once() + frame = aggregator.push_frame.await_args.args[0] + assert isinstance(frame, LLMMessagesAppendFrame) + assert frame.run_llm is True + assert frame.messages == [ + { + "role": "user", + "content": "The user has been quiet. Politely and briefly ask if they're still there in the language that the user has been speaking so far.", + } + ] diff --git a/pipecat b/pipecat index 17b474d..f780c6d 160000 --- a/pipecat +++ b/pipecat @@ -1 +1 @@ -Subproject commit 17b474db8bdc8ae832825e9f601309c093fee0ed +Subproject commit f780c6de083d607adc7779109cad37f8b5a7030d