mirror of
https://github.com/dograh-hq/dograh.git
synced 2026-06-07 07:55:16 +02:00
fix: resample telephony audio for openai realtime
This commit is contained in:
parent
3ddb66e2f7
commit
d37d6d05c1
3 changed files with 157 additions and 1 deletions
|
|
@ -11,12 +11,15 @@ Adds:
|
|||
- **User-mute audio gating** via ``UserMuteStarted/StoppedFrame``.
|
||||
- **TTSSpeakFrame as initial-response trigger** so the engine's greeting
|
||||
flow kicks off the bot's first response.
|
||||
- **One-off LLMMessagesAppendFrame handling** for ephemeral realtime prompts
|
||||
like user-idle checks, without mutating Dograh's local ``LLMContext``.
|
||||
- **finalized=True on TranscriptionFrame** for parity with the Gemini
|
||||
service (every OpenAI transcription via the ``completed`` event is
|
||||
final by construction).
|
||||
"""
|
||||
|
||||
import json
|
||||
from typing import Any
|
||||
|
||||
from loguru import logger
|
||||
|
||||
|
|
@ -24,6 +27,8 @@ from pipecat.frames.frames import (
|
|||
BotStartedSpeakingFrame,
|
||||
BotStoppedSpeakingFrame,
|
||||
Frame,
|
||||
LLMFullResponseStartFrame,
|
||||
LLMMessagesAppendFrame,
|
||||
TranscriptionFrame,
|
||||
TTSSpeakFrame,
|
||||
UserMuteStartedFrame,
|
||||
|
|
@ -32,6 +37,7 @@ from pipecat.frames.frames import (
|
|||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.llm_service import FunctionCallFromLLM
|
||||
from pipecat.services.openai.realtime import events
|
||||
from pipecat.services.openai.realtime.llm import OpenAIRealtimeLLMService
|
||||
from pipecat.transcriptions.language import Language
|
||||
from pipecat.utils.time import time_now_iso8601
|
||||
|
|
@ -81,6 +87,9 @@ class DograhOpenAIRealtimeLLMService(OpenAIRealtimeLLMService):
|
|||
# Don't forward the frame; the audio path is owned by the realtime
|
||||
# service itself.
|
||||
return
|
||||
if isinstance(frame, LLMMessagesAppendFrame):
|
||||
await self._handle_messages_append(frame)
|
||||
return
|
||||
if isinstance(frame, BotStartedSpeakingFrame):
|
||||
self._bot_is_speaking = True
|
||||
elif isinstance(frame, BotStoppedSpeakingFrame):
|
||||
|
|
@ -88,6 +97,33 @@ class DograhOpenAIRealtimeLLMService(OpenAIRealtimeLLMService):
|
|||
await self._run_pending_function_calls()
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
async def _handle_messages_append(self, frame: LLMMessagesAppendFrame):
|
||||
"""Consume a one-off append frame without mutating the local LLMContext."""
|
||||
if self._disconnecting:
|
||||
return
|
||||
|
||||
if not self._api_session_ready:
|
||||
if frame.run_llm:
|
||||
logger.debug(
|
||||
f"{self}: LLMMessagesAppendFrame received before session ready; "
|
||||
"deferring response until the session is initialized"
|
||||
)
|
||||
self._run_llm_when_api_session_ready = True
|
||||
return
|
||||
|
||||
appended_any = False
|
||||
for message in frame.messages:
|
||||
item = self._message_to_conversation_item(message)
|
||||
if item is None:
|
||||
continue
|
||||
evt = events.ConversationItemCreateEvent(item=item)
|
||||
self._messages_added_manually[evt.item.id] = True
|
||||
await self.send_client_event(evt)
|
||||
appended_any = True
|
||||
|
||||
if frame.run_llm and appended_any:
|
||||
await self._send_manual_response_create()
|
||||
|
||||
async def _handle_context(self, context: LLMContext):
|
||||
if not self._handled_initial_context:
|
||||
if context is None:
|
||||
|
|
@ -107,6 +143,67 @@ class DograhOpenAIRealtimeLLMService(OpenAIRealtimeLLMService):
|
|||
return
|
||||
await super()._send_user_audio(frame)
|
||||
|
||||
def _message_to_conversation_item(
|
||||
self, message: dict[str, Any]
|
||||
) -> events.ConversationItem | None:
|
||||
if not isinstance(message, dict):
|
||||
logger.warning(
|
||||
f"{self}: skipping unsupported appended message payload {message!r}"
|
||||
)
|
||||
return None
|
||||
|
||||
role = message.get("role")
|
||||
if role not in {"user", "system", "developer"}:
|
||||
logger.warning(
|
||||
f"{self}: skipping unsupported appended message role {role!r}"
|
||||
)
|
||||
return None
|
||||
|
||||
text = self._extract_text_content(message.get("content"))
|
||||
if not text:
|
||||
logger.warning(
|
||||
f"{self}: skipping appended message with unsupported content {message!r}"
|
||||
)
|
||||
return None
|
||||
|
||||
item_role = "system" if role in {"system", "developer"} else "user"
|
||||
return events.ConversationItem(
|
||||
type="message",
|
||||
role=item_role,
|
||||
content=[events.ItemContent(type="input_text", text=text)],
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _extract_text_content(content: Any) -> str | None:
|
||||
if isinstance(content, str):
|
||||
return content
|
||||
if isinstance(content, list):
|
||||
parts: list[str] = []
|
||||
for part in content:
|
||||
if not isinstance(part, dict):
|
||||
return None
|
||||
if part.get("type") != "text":
|
||||
return None
|
||||
text = part.get("text")
|
||||
if not isinstance(text, str):
|
||||
return None
|
||||
parts.append(text)
|
||||
return "\n".join(parts) if parts else None
|
||||
return None
|
||||
|
||||
async def _send_manual_response_create(self):
|
||||
"""Trigger inference after manually appending conversation items."""
|
||||
await self.push_frame(LLMFullResponseStartFrame())
|
||||
await self.start_processing_metrics()
|
||||
await self.start_ttfb_metrics()
|
||||
await self.send_client_event(
|
||||
events.ResponseCreateEvent(
|
||||
response=events.ResponseProperties(
|
||||
output_modalities=self._get_enabled_modalities()
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
async def _run_pending_function_calls(self):
|
||||
if not self._deferred_function_calls:
|
||||
return
|
||||
|
|
|
|||
59
api/tests/test_realtime_message_append.py
Normal file
59
api/tests/test_realtime_message_append.py
Normal file
|
|
@ -0,0 +1,59 @@
|
|||
from types import SimpleNamespace
|
||||
from unittest.mock import AsyncMock
|
||||
|
||||
import pytest
|
||||
from pipecat.frames.frames import LLMMessagesAppendFrame
|
||||
from pipecat.services.openai.realtime import events
|
||||
|
||||
from api.services.pipecat.realtime.openai_realtime import (
|
||||
DograhOpenAIRealtimeLLMService,
|
||||
)
|
||||
from api.services.workflow.pipecat_engine_callbacks import UserIdleHandler
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_openai_realtime_messages_append_frame_sends_conversation_item():
|
||||
service = DograhOpenAIRealtimeLLMService(api_key="test")
|
||||
service._api_session_ready = True
|
||||
service.send_client_event = AsyncMock()
|
||||
service._send_manual_response_create = AsyncMock()
|
||||
|
||||
await service._handle_messages_append(
|
||||
LLMMessagesAppendFrame(
|
||||
[{"role": "user", "content": "Are you still there?"}],
|
||||
run_llm=True,
|
||||
)
|
||||
)
|
||||
|
||||
service.send_client_event.assert_awaited_once()
|
||||
event = service.send_client_event.await_args.args[0]
|
||||
assert isinstance(event, events.ConversationItemCreateEvent)
|
||||
assert event.item.role == "user"
|
||||
assert event.item.type == "message"
|
||||
assert event.item.content == [
|
||||
events.ItemContent(type="input_text", text="Are you still there?")
|
||||
]
|
||||
service._send_manual_response_create.assert_awaited_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_user_idle_handler_uses_realtime_append_path():
|
||||
engine = SimpleNamespace(
|
||||
llm=SimpleNamespace(),
|
||||
end_call_with_reason=AsyncMock(),
|
||||
)
|
||||
aggregator = SimpleNamespace(push_frame=AsyncMock())
|
||||
handler = UserIdleHandler(engine)
|
||||
|
||||
await handler.handle_idle(aggregator)
|
||||
|
||||
aggregator.push_frame.assert_awaited_once()
|
||||
frame = aggregator.push_frame.await_args.args[0]
|
||||
assert isinstance(frame, LLMMessagesAppendFrame)
|
||||
assert frame.run_llm is True
|
||||
assert frame.messages == [
|
||||
{
|
||||
"role": "user",
|
||||
"content": "The user has been quiet. Politely and briefly ask if they're still there in the language that the user has been speaking so far.",
|
||||
}
|
||||
]
|
||||
2
pipecat
2
pipecat
|
|
@ -1 +1 @@
|
|||
Subproject commit 17b474db8bdc8ae832825e9f601309c093fee0ed
|
||||
Subproject commit f780c6de083d607adc7779109cad37f8b5a7030d
|
||||
Loading…
Add table
Add a link
Reference in a new issue