feat: add openai realtime models

This commit is contained in:
Abhishek Kumar 2026-05-16 08:42:43 +05:30
parent 53f1959edf
commit 4d7b681928
33 changed files with 1518 additions and 75 deletions

View file

@ -29,6 +29,20 @@ class ToolParameter(BaseModel):
)
class PresetToolParameter(BaseModel):
"""A parameter injected by Dograh at runtime."""
name: str = Field(description="Parameter name (used as key in request body)")
type: str = Field(description="Parameter type: string, number, or boolean")
value_template: str = Field(
description="Fixed value or template, e.g. {{initial_context.phone_number}}"
)
required: bool = Field(
default=True,
description="Whether the parameter must resolve to a non-empty value",
)
class HttpApiConfig(BaseModel):
"""Configuration for HTTP API tools."""
@ -43,6 +57,10 @@ class HttpApiConfig(BaseModel):
parameters: Optional[List[ToolParameter]] = Field(
default=None, description="Parameters that the tool accepts from LLM"
)
preset_parameters: Optional[List[PresetToolParameter]] = Field(
default=None,
description="Parameters injected by Dograh from fixed values or workflow context templates",
)
timeout_ms: Optional[int] = Field(
default=5000, description="Request timeout in milliseconds"
)

View file

@ -34,6 +34,7 @@ class ServiceProviders(str, Enum):
RIME = "rime"
OPENAI_REALTIME = "openai_realtime"
GOOGLE_REALTIME = "google_realtime"
GOOGLE_VERTEX_REALTIME = "google_vertex_realtime"
class BaseServiceConfiguration(BaseModel):
@ -53,6 +54,7 @@ class BaseServiceConfiguration(BaseModel):
ServiceProviders.RIME,
ServiceProviders.OPENAI_REALTIME,
ServiceProviders.GOOGLE_REALTIME,
ServiceProviders.GOOGLE_VERTEX_REALTIME,
# ServiceProviders.SARVAM,
]
api_key: str | list[str]
@ -291,7 +293,7 @@ class SpeachesLLMConfiguration(BaseLLMConfiguration):
api_key: str | list[str] | None = Field(default=None)
OPENAI_REALTIME_MODELS = ["gpt-4o-realtime-preview", "gpt-4o-mini-realtime-preview"]
OPENAI_REALTIME_MODELS = ["gpt-realtime-2"]
OPENAI_REALTIME_VOICES = [
"alloy",
"ash",
@ -304,22 +306,25 @@ OPENAI_REALTIME_VOICES = [
]
# @register_service(ServiceType.REALTIME)
# class OpenAIRealtimeLLMConfiguration(BaseLLMConfiguration):
# provider: Literal[ServiceProviders.OPENAI_REALTIME] = (
# ServiceProviders.OPENAI_REALTIME
# )
# model: str = Field(
# default="gpt-4o-realtime-preview",
# json_schema_extra={
# "examples": OPENAI_REALTIME_MODELS,
# "allow_custom_input": True,
# },
# )
# voice: str = Field(
# default="alloy",
# json_schema_extra={"examples": OPENAI_REALTIME_VOICES},
# )
@register_service(ServiceType.REALTIME)
class OpenAIRealtimeLLMConfiguration(BaseLLMConfiguration):
provider: Literal[ServiceProviders.OPENAI_REALTIME] = (
ServiceProviders.OPENAI_REALTIME
)
model: str = Field(
default="gpt-realtime-2",
json_schema_extra={
"examples": OPENAI_REALTIME_MODELS,
"allow_custom_input": True,
},
)
voice: str = Field(
default="alloy",
json_schema_extra={
"examples": OPENAI_REALTIME_VOICES,
"allow_custom_input": True,
},
)
GOOGLE_REALTIME_MODELS = ["gemini-3.1-flash-live-preview"]
@ -381,9 +386,58 @@ class GoogleRealtimeLLMConfiguration(BaseLLMConfiguration):
)
GOOGLE_VERTEX_REALTIME_MODELS = [
"google/gemini-live-2.5-flash-native-audio",
]
GOOGLE_VERTEX_REALTIME_VOICES = GOOGLE_REALTIME_VOICES
GOOGLE_VERTEX_REALTIME_LANGUAGES = GOOGLE_REALTIME_LANGUAGES
@register_service(ServiceType.REALTIME)
class GoogleVertexRealtimeLLMConfiguration(BaseLLMConfiguration):
provider: Literal[ServiceProviders.GOOGLE_VERTEX_REALTIME] = (
ServiceProviders.GOOGLE_VERTEX_REALTIME
)
model: str = Field(
default="google/gemini-live-2.5-flash-native-audio",
json_schema_extra={
"examples": GOOGLE_VERTEX_REALTIME_MODELS,
"allow_custom_input": True,
},
)
voice: str = Field(
default="Charon",
json_schema_extra={
"examples": GOOGLE_VERTEX_REALTIME_VOICES,
"allow_custom_input": True,
},
)
language: str = Field(
default="en-US",
json_schema_extra={
"examples": GOOGLE_VERTEX_REALTIME_LANGUAGES,
"allow_custom_input": True,
},
)
project_id: str = Field(description="Google Cloud project ID for Vertex AI.")
location: str = Field(
default="us-east4",
description="GCP region for the Vertex AI endpoint (e.g. 'us-east4').",
)
credentials: str | None = Field(
default=None,
description=(
"Service account JSON credentials string. If omitted, falls back to "
"Application Default Credentials (ADC)."
),
)
api_key: str | list[str] | None = Field(default=None)
REALTIME_PROVIDERS = {
ServiceProviders.OPENAI_REALTIME.value,
ServiceProviders.GOOGLE_REALTIME.value,
ServiceProviders.GOOGLE_VERTEX_REALTIME.value,
}
@ -403,8 +457,9 @@ LLMConfig = Annotated[
RealtimeConfig = Annotated[
Union[
# OpenAIRealtimeLLMConfiguration,
OpenAIRealtimeLLMConfiguration,
GoogleRealtimeLLMConfiguration,
GoogleVertexRealtimeLLMConfiguration,
],
Field(discriminator="provider"),
]

View file

@ -0,0 +1,9 @@
"""Dograh-specific subclasses of pipecat realtime LLM services.
Each subclass wires Dograh engine integration quirks (user-mute gating,
TTSSpeakFrame greeting trigger, node-transition handling, function-call
deferral, etc.) onto the corresponding pipecat realtime service.
The pipecat fork's services stay close to upstream — Dograh behavior lives
here.
"""

View file

@ -0,0 +1,237 @@
"""Dograh subclass of pipecat's Gemini Live LLM service.
Layers Dograh engine integration quirks onto upstream-pristine
:class:`GeminiLiveLLMService`:
- **Deferred connect.** Connection is held back until ``system_instruction``
is set via :meth:`_update_settings`, so pre-call-fetch template variables
land before the live session opens.
- **Reconnect on node transitions.** Gemini Live cannot update
``system_instruction`` mid-session, so a setting change triggers a
reconnect (deferred until the bot turn ends if currently responding).
- **Function-call deferral.** Tool calls emitted mid-turn are queued and run
when the bot stops speaking, to avoid racing the turn's audio.
- **User-mute audio gating.** ``UserMuteStarted/StoppedFrame`` from the
user aggregator gates whether incoming audio is forwarded to Gemini.
- **TTSSpeakFrame as greeting trigger.** The engine queues a TTSSpeakFrame
to kick off the first response after node setup; the service intercepts
it and runs the initial-context path.
- **Finalize-pending on transcriptions.** Marks the transcription emitted
immediately after VAD-stop as finalized, distinguishing it from
mid-turn partials.
"""
from typing import Any
from loguru import logger
from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
Frame,
TranscriptionFrame,
TTSSpeakFrame,
UserMuteStartedFrame,
UserMuteStoppedFrame,
)
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.google.gemini_live.llm import GeminiLiveLLMService
from pipecat.services.llm_service import FunctionCallFromLLM
from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_gemini_live
class DograhGeminiLiveLLMService(GeminiLiveLLMService):
"""Gemini Live with Dograh engine integration quirks. See module docstring."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
# User-mute state, driven by broadcast UserMute{Started,Stopped}Frames.
# Audio is not forwarded to Gemini while muted.
self._user_is_muted: bool = False
# Guards initial-response triggering against double-firing across the
# initial TTSSpeakFrame and any LLMContextFrame that may arrive.
self._handled_initial_context: bool = False
# When a system_instruction change arrives mid-bot-turn, the reconnect
# is queued and drained when the turn ends.
self._reconnect_pending: bool = False
# Function calls emitted by Gemini mid-bot-turn are deferred here and
# invoked when the turn ends, so they don't race the turn's audio.
self._pending_function_calls: list[FunctionCallFromLLM] = []
# Tracks whether the next transcription to arrive should be marked as
# the finalized transcription for the current user turn.
self._finalize_pending: bool = False
# ------------------------------------------------------------------
# Hooks from upstream GeminiLiveLLMService
# ------------------------------------------------------------------
def _should_connect_on_start(self) -> bool:
# Hold the connection until the engine sets a system_instruction. This
# lets pre-call fetch populate template variables first.
return bool(self._settings.system_instruction)
async def _handle_changed_settings(self, changed: dict[str, Any]) -> set[str]:
if "system_instruction" not in changed:
return set()
if not self._session:
# First-time setting after deferred-connect.
await self._connect()
elif self._bot_is_responding:
# Bot is mid-turn — drain the reconnect when it ends so we don't
# cut the bot off mid-utterance.
self._reconnect_pending = True
else:
await self._reconnect()
return {"system_instruction"}
async def _run_or_defer_function_calls(
self, function_calls_llm: list[FunctionCallFromLLM]
):
if self._bot_is_responding:
# Latest batch wins; Gemini emits tool calls as one batch per
# tool_call message, so this overwrite is intentional.
self._pending_function_calls = function_calls_llm
logger.debug(
f"{self}: deferring {len(function_calls_llm)} function call(s) "
"until bot turn ends"
)
return
await super()._run_or_defer_function_calls(function_calls_llm)
# ------------------------------------------------------------------
# State-transition side effects
# ------------------------------------------------------------------
async def _set_bot_is_responding(self, responding: bool):
was_responding = self._bot_is_responding
await super()._set_bot_is_responding(responding)
if was_responding and not responding:
await self._run_pending_function_calls()
if self._reconnect_pending:
self._reconnect_pending = False
await self._reconnect()
async def _run_pending_function_calls(self):
"""Run any function calls deferred during the bot's last turn."""
if not self._pending_function_calls:
return
fcs = self._pending_function_calls
self._pending_function_calls = []
logger.debug(
f"{self}: executing {len(fcs)} deferred function call(s) "
"after bot turn ended"
)
await self.run_function_calls(fcs)
# ------------------------------------------------------------------
# Frame handling: mute, TTSSpeakFrame, BotStoppedSpeakingFrame flush
# ------------------------------------------------------------------
async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, UserMuteStartedFrame):
self._user_is_muted = True
await self.push_frame(frame, direction)
return
if isinstance(frame, UserMuteStoppedFrame):
self._user_is_muted = False
await self.push_frame(frame, direction)
return
if isinstance(frame, TTSSpeakFrame):
# Greeting trigger: the engine queues a TTSSpeakFrame to start the
# bot's first turn after node setup. Gemini Live renders its own
# audio, so we don't pass the frame through — we re-enter
# _handle_context to kick off the initial response.
if not self._handled_initial_context:
await self._handle_context(self._context)
else:
logger.warning(
f"{self}: TTSSpeakFrame after initial context already "
"handled — Gemini Live owns audio generation, ignoring"
)
return
if isinstance(frame, BotStoppedSpeakingFrame):
# Belt-and-suspenders: the main drain happens in
# _set_bot_is_responding(False), but if Gemini delays turn_complete
# past the audible end of the turn, flushing here ensures pending
# function calls fire promptly.
await self._run_pending_function_calls()
# Fall through to super for the actual push.
await super().process_frame(frame, direction)
async def _send_user_audio(self, frame):
if self._user_is_muted:
return
await super()._send_user_audio(frame)
# ------------------------------------------------------------------
# Context lifecycle: Dograh pre-populates self._context via the engine,
# so upstream's "first arrival === self._context is None" check doesn't
# work. We gate on _handled_initial_context instead and skip the
# init-instruction reconciliation (Dograh updates system_instruction at
# runtime via _update_settings, not via init).
# ------------------------------------------------------------------
async def _handle_context(self, context: LLMContext):
if not self._handled_initial_context:
self._handled_initial_context = True
self._context = context
await self._create_initial_response()
else:
self._context = context
await self._process_completed_function_calls(send_new_results=True)
# ------------------------------------------------------------------
# Session lifecycle: drop upstream's automatic reconnect-seed and
# initial-context-seed paths. The TTSSpeakFrame trigger and the
# function-call-result LLMContextFrame are the only paths that should
# kick off bot turns in the Dograh flow.
# ------------------------------------------------------------------
@traced_gemini_live(operation="llm_setup")
async def _handle_session_ready(self, session):
logger.debug(
f"In _handle_session_ready self._run_llm_when_session_ready: {self._run_llm_when_session_ready}"
)
self._session = session
self._ready_for_realtime_input = True
if self._run_llm_when_session_ready:
# Context arrived before session was ready — fulfil the queued
# initial response now.
self._run_llm_when_session_ready = False
await self._create_initial_response()
await self._drain_pending_tool_results()
# Otherwise: no automatic seed. Reconnect after a session-resumption
# update relies on the server-side restored state; reconnects without
# a handle (e.g. node transitions before any handle was issued) are
# followed by a function-call-result LLMContextFrame which feeds the
# updated-context branch in _handle_context.
# ------------------------------------------------------------------
# Transcription: broadcast (so downstream voicemail detector and
# logs buffer both see it) and set finalized= for turn-boundary
# semantics.
# ------------------------------------------------------------------
async def _handle_user_started_speaking(self, frame):
await super()._handle_user_started_speaking(frame)
# A new VAD start invalidates any pending finalize from a prior stop
# that hasn't been paired with a transcription yet.
self._finalize_pending = False
async def _handle_user_stopped_speaking(self, frame):
await super()._handle_user_stopped_speaking(frame)
self._finalize_pending = True
async def _push_user_transcription(self, text: str, result=None):
await self._handle_user_transcription(text, True, self._settings.language)
finalized = self._finalize_pending
self._finalize_pending = False
await self.broadcast_frame(
TranscriptionFrame,
text=text,
user_id="",
timestamp=time_now_iso8601(),
result=result,
finalized=finalized,
)

View file

@ -0,0 +1,42 @@
"""Dograh subclass of pipecat's Gemini Live Vertex AI LLM service.
Diamond inheritance: combines the Dograh engine-integration overrides from
:class:`DograhGeminiLiveLLMService` with the Vertex-specific tweaks from
upstream's :class:`GeminiLiveVertexLLMService` (no history config,
``NON_BLOCKING`` tools disabled, service-account credentials).
MRO::
DograhGeminiLiveVertexLLMService
-> DograhGeminiLiveLLMService
-> GeminiLiveVertexLLMService
-> GeminiLiveLLMService
-> LLMService
-> ...
"""
from api.services.pipecat.realtime.gemini_live import DograhGeminiLiveLLMService
from pipecat.services.google.gemini_live.vertex.llm import (
GeminiLiveVertexLLMService,
)
class DograhGeminiLiveVertexLLMService(
DograhGeminiLiveLLMService,
GeminiLiveVertexLLMService,
):
"""Vertex AI variant of Gemini Live with Dograh integration quirks."""
pass
# Guard against MRO regressions: a future refactor that flips inheritance
# order or breaks the diamond would silently bypass the Dograh overrides.
_mro = DograhGeminiLiveVertexLLMService.__mro__
assert _mro[1] is DograhGeminiLiveLLMService, (
f"Expected DograhGeminiLiveLLMService at MRO[1], got {_mro[1]}"
)
assert _mro[2] is GeminiLiveVertexLLMService, (
f"Expected GeminiLiveVertexLLMService at MRO[2], got {_mro[2]}"
)
del _mro

View file

@ -0,0 +1,177 @@
"""Dograh subclass of pipecat's OpenAI Realtime LLM service.
Layers Dograh engine integration quirks onto upstream-pristine
:class:`OpenAIRealtimeLLMService`. Substantially smaller than the Gemini
subclass because OpenAI Realtime supports runtime ``session.update`` for
both ``system_instruction`` and tools no reconnect/defer-tool-call
machinery needed.
Adds:
- **User-mute audio gating** via ``UserMuteStarted/StoppedFrame``.
- **TTSSpeakFrame as initial-response trigger** so the engine's greeting
flow kicks off the bot's first response.
- **finalized=True on TranscriptionFrame** for parity with the Gemini
service (every OpenAI transcription via the ``completed`` event is
final by construction).
"""
import json
from loguru import logger
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
Frame,
TranscriptionFrame,
TTSSpeakFrame,
UserMuteStartedFrame,
UserMuteStoppedFrame,
)
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.llm_service import FunctionCallFromLLM
from pipecat.services.openai.realtime.llm import OpenAIRealtimeLLMService
from pipecat.transcriptions.language import Language
from pipecat.utils.time import time_now_iso8601
class DograhOpenAIRealtimeLLMService(OpenAIRealtimeLLMService):
"""OpenAI Realtime with Dograh engine integration quirks. See module docstring."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._user_is_muted: bool = False
# Dograh pre-populates self._context via the engine before the first
# LLMContextFrame arrives, so upstream's "first arrival means
# self._context is None" check no longer works.
self._handled_initial_context: bool = False
# Track bot speech locally so tool calls can be deferred until the bot
# has finished speaking, matching Dograh's Gemini Live behavior.
self._bot_is_speaking: bool = False
self._deferred_function_calls: list[FunctionCallFromLLM] = []
# ------------------------------------------------------------------
# Frame handling: mute, TTSSpeakFrame as greeting trigger
# ------------------------------------------------------------------
async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, UserMuteStartedFrame):
self._user_is_muted = True
await self.push_frame(frame, direction)
return
if isinstance(frame, UserMuteStoppedFrame):
self._user_is_muted = False
await self.push_frame(frame, direction)
return
if isinstance(frame, TTSSpeakFrame):
# Greeting trigger: the engine queues a TTSSpeakFrame after node
# setup. OpenAI Realtime renders its own audio, so we don't pass
# the frame to TTS. Route through _handle_context so the initial
# response and later tool-result turns share the same context
# lifecycle even when Dograh has already pre-populated self._context.
if not self._handled_initial_context:
await self._handle_context(self._context)
else:
logger.warning(
f"{self}: TTSSpeakFrame after initial context already "
"handled — OpenAI Realtime owns audio generation, ignoring"
)
# Don't forward the frame; the audio path is owned by the realtime
# service itself.
return
if isinstance(frame, BotStartedSpeakingFrame):
self._bot_is_speaking = True
elif isinstance(frame, BotStoppedSpeakingFrame):
self._bot_is_speaking = False
await self._run_pending_function_calls()
await super().process_frame(frame, direction)
async def _handle_context(self, context: LLMContext):
if not self._handled_initial_context:
if context is None:
logger.warning(
f"{self}: received initial context trigger before context was set"
)
return
self._handled_initial_context = True
self._context = context
await self._create_response()
else:
self._context = context
await self._process_completed_function_calls(send_new_results=True)
async def _send_user_audio(self, frame):
if self._user_is_muted:
return
await super()._send_user_audio(frame)
async def _run_pending_function_calls(self):
if not self._deferred_function_calls:
return
function_calls = self._deferred_function_calls
self._deferred_function_calls = []
logger.debug(
f"{self}: executing {len(function_calls)} deferred function call(s) "
"after bot turn ended"
)
await self.run_function_calls(function_calls)
async def _handle_evt_function_call_arguments_done(self, evt):
"""Process or defer tool calls until the bot finishes speaking."""
try:
args = json.loads(evt.arguments)
function_call_item = self._pending_function_calls.get(evt.call_id)
if function_call_item:
del self._pending_function_calls[evt.call_id]
function_calls = [
FunctionCallFromLLM(
context=self._context,
tool_call_id=evt.call_id,
function_name=function_call_item.name,
arguments=args,
)
]
if self._bot_is_speaking:
self._deferred_function_calls.extend(function_calls)
logger.debug(
f"{self}: deferring function call {function_call_item.name} "
"until bot stops speaking"
)
else:
await self.run_function_calls(function_calls)
logger.debug(f"Processed function call: {function_call_item.name}")
else:
logger.warning(
f"No tracked function call found for call_id: {evt.call_id}"
)
logger.warning(
f"Available pending calls: {list(self._pending_function_calls.keys())}"
)
except Exception as e:
logger.error(f"Failed to process function call arguments: {e}")
# ------------------------------------------------------------------
# Transcription: broadcast with finalized=True for parity with the
# Gemini service (consumers that check `finalized` should see True
# for every completed-transcription event from OpenAI).
# ------------------------------------------------------------------
async def handle_evt_input_audio_transcription_completed(self, evt):
await self._call_event_handler(
"on_conversation_item_updated", evt.item_id, None
)
await self.broadcast_frame(
TranscriptionFrame,
text=evt.transcript,
user_id="",
timestamp=time_now_iso8601(),
result=evt,
finalized=True,
)
await self._handle_user_transcription(evt.transcript, True, Language.EN)

View file

@ -86,6 +86,43 @@ from pipecat.utils.run_context import set_current_org_id, set_current_run_id
ensure_tracing()
def _create_realtime_user_turn_config(provider: str):
"""Return user turn strategies and optional local VAD for realtime providers."""
if provider in {
ServiceProviders.GOOGLE_REALTIME.value,
ServiceProviders.GOOGLE_VERTEX_REALTIME.value,
}:
# Let Gemini Live own barge-in via its server-side VAD, but keep local
# Silero VAD for early user-turn start and speaking-state tracking.
return (
UserTurnStrategies(
start=[VADUserTurnStartStrategy(enable_interruptions=False)],
stop=[SpeechTimeoutUserTurnStopStrategy()],
),
SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
)
if provider == ServiceProviders.OPENAI_REALTIME.value:
# OpenAI Realtime already emits speaking-state frames and interruption
# events from the provider, so the aggregator should follow those
# external signals rather than run its own local VAD.
return (
UserTurnStrategies(
start=[ExternalUserTurnStartStrategy()],
stop=[ExternalUserTurnStopStrategy()],
),
None,
)
return (
UserTurnStrategies(
start=[VADUserTurnStartStrategy()],
stop=[SpeechTimeoutUserTurnStopStrategy()],
),
SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
)
async def run_pipeline_telephony(
websocket,
*,
@ -138,6 +175,20 @@ async def run_pipeline_telephony(
"telephony_configuration_id"
)
# Resolve effective user config here so the transport can tune its
# bot-stopped-speaking fallback based on is_realtime; pass the resolved
# values into _run_pipeline so it doesn't fetch them again.
from api.services.configuration.resolve import resolve_effective_config
user_config = await db_client.get_user_configurations(user_id)
run_configs = (
(workflow_run.definition.workflow_configurations or {}) if workflow_run else {}
)
user_config = resolve_effective_config(
user_config, run_configs.get("model_overrides")
)
is_realtime = bool(user_config.is_realtime and user_config.realtime is not None)
spec = telephony_registry.get(provider_name)
audio_config = create_audio_config(provider_name)
@ -148,6 +199,7 @@ async def run_pipeline_telephony(
workflow.organization_id,
ambient_noise_config=ambient_noise_config,
telephony_configuration_id=telephony_configuration_id,
is_realtime=is_realtime,
**transport_kwargs,
)
@ -158,6 +210,8 @@ async def run_pipeline_telephony(
workflow_run_id,
user_id,
audio_config=audio_config,
workflow_run=workflow_run,
resolved_user_config=user_config,
)
except Exception as e:
logger.error(
@ -198,11 +252,27 @@ async def run_pipeline_smallwebrtc(
# Create audio configuration for WebRTC
audio_config = create_audio_config(WorkflowRunMode.SMALLWEBRTC.value)
# Resolve workflow_run + effective user_config here so the transport can
# tune its bot-stopped-speaking fallback based on is_realtime. _run_pipeline
# reuses these via kwargs so we don't fetch twice.
from api.services.configuration.resolve import resolve_effective_config
workflow_run = await db_client.get_workflow_run(workflow_run_id, user_id)
user_config = await db_client.get_user_configurations(user_id)
run_configs = (
(workflow_run.definition.workflow_configurations or {}) if workflow_run else {}
)
user_config = resolve_effective_config(
user_config, run_configs.get("model_overrides")
)
is_realtime = bool(user_config.is_realtime and user_config.realtime is not None)
transport = await create_webrtc_transport(
webrtc_connection,
workflow_run_id,
audio_config,
ambient_noise_config,
is_realtime=is_realtime,
)
await _run_pipeline(
transport,
@ -212,6 +282,8 @@ async def run_pipeline_smallwebrtc(
call_context_vars=call_context_vars,
audio_config=audio_config,
user_provider_id=user_provider_id,
workflow_run=workflow_run,
resolved_user_config=user_config,
)
@ -223,6 +295,8 @@ async def _run_pipeline(
call_context_vars: dict = {},
audio_config: AudioConfig = None,
user_provider_id: str | None = None,
workflow_run=None,
resolved_user_config=None,
) -> None:
"""
Run the pipeline with the given transport and configuration
@ -232,9 +306,12 @@ async def _run_pipeline(
workflow_id: The ID of the workflow
workflow_run_id: The ID of the workflow run
user_id: The ID of the user
mode: The mode of the pipeline (twilio or smallwebrtc)
workflow_run: Pre-fetched workflow run row. Fetched here if None.
resolved_user_config: User configuration with model_overrides already
applied. Fetched and resolved here if None.
"""
workflow_run = await db_client.get_workflow_run(workflow_run_id, user_id)
if workflow_run is None:
workflow_run = await db_client.get_workflow_run(workflow_run_id, user_id)
# If the workflow run is already completed, we don't need to run it again
if workflow_run.is_completed:
@ -246,9 +323,6 @@ async def _run_pipeline(
if call_context_vars:
merged_call_context_vars = {**merged_call_context_vars, **call_context_vars}
# Get user configuration
user_config = await db_client.get_user_configurations(user_id)
# Get workflow for metadata (name, organization_id, call_disposition_codes)
workflow = await db_client.get_workflow(workflow_id, user_id)
if not workflow:
@ -286,11 +360,17 @@ async def _run_pipeline(
term.strip() for term in dictionary.split(",") if term.strip()
]
# Resolve model overrides from the version onto global user config
from api.services.configuration.resolve import resolve_effective_config
# Resolve model overrides from the version onto global user config (skip
# when the caller already resolved it).
if resolved_user_config is None:
from api.services.configuration.resolve import resolve_effective_config
model_overrides = run_configs.get("model_overrides")
user_config = resolve_effective_config(user_config, model_overrides)
user_config = await db_client.get_user_configurations(user_id)
user_config = resolve_effective_config(
user_config, run_configs.get("model_overrides")
)
else:
user_config = resolved_user_config
# Detect realtime mode (speech-to-speech services like OpenAI Realtime, Gemini Live)
is_realtime = user_config.is_realtime and user_config.realtime is not None
@ -453,23 +533,20 @@ async def _run_pipeline(
correct_aggregation_callback=engine.create_aggregation_correction_callback(),
)
user_mute_strategies = [
MuteUntilFirstBotCompleteUserMuteStrategy(),
FunctionCallUserMuteStrategy(),
CallbackUserMuteStrategy(should_mute_callback=engine.should_mute_user),
]
user_vad_analyzer = SileroVADAnalyzer(params=VADParams(stop_secs=0.2))
# Configure turn strategies based on STT provider, model, and workflow configuration
if is_realtime:
# Realtime services do server-side turn detection for response generation,
# but we still need a client-side stop strategy so the user aggregator emits
# UserStoppedSpeakingFrame. Without it, downstream consumers (e.g. voicemail
# detector) and Gemini Live's _finalize_pending flag never see a turn end.
user_turn_strategies = UserTurnStrategies(
start=[VADUserTurnStartStrategy()],
stop=[SpeechTimeoutUserTurnStopStrategy()],
# Realtime services still need user-turn tracking even when the model
# itself owns speech generation and interruption behavior.
user_turn_strategies, user_vad_analyzer = _create_realtime_user_turn_config(
user_config.realtime.provider
)
# Lets not start the pipeline as muted for Realtime
# - CallbackUserMuteStrategy: mutes based on engine's _mute_pipeline state
user_mute_strategies = [
FunctionCallUserMuteStrategy(),
CallbackUserMuteStrategy(should_mute_callback=engine.should_mute_user),
]
else:
# Deepgram Flux uses external turn detection (VAD + External start/stop)
# Other models use configurable turn detection strategy
@ -510,18 +587,11 @@ async def _run_pipeline(
stop=[SpeechTimeoutUserTurnStopStrategy()],
)
# - CallbackUserMuteStrategy: mutes based on engine's _mute_pipeline state
user_mute_strategies = [
MuteUntilFirstBotCompleteUserMuteStrategy(),
FunctionCallUserMuteStrategy(),
CallbackUserMuteStrategy(should_mute_callback=engine.should_mute_user),
]
user_params = LLMUserAggregatorParams(
user_turn_strategies=user_turn_strategies,
user_mute_strategies=user_mute_strategies,
user_idle_timeout=max_user_idle_timeout,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
vad_analyzer=user_vad_analyzer,
)
context_aggregator = LLMContextAggregatorPair(
context, assistant_params=assistant_params, user_params=user_params
@ -562,15 +632,14 @@ async def _run_pipeline(
)
engine.set_fetch_recording_audio(fetch_audio)
# Voicemail detection works in both modes. In realtime mode the detector sits
# after the realtime LLM and consumes the TranscriptionFrames it broadcasts;
# the LLM gate / TTS gate are not used (the realtime LLM responds to audio
# directly, not LLMContextFrames), so on detection we rely on
# end_call_with_reason to drop the call.
voicemail_config = (workflow.workflow_configurations or {}).get(
"voicemail_detection", {}
)
if voicemail_config.get("enabled", False):
if is_realtime and voicemail_config.get("enabled", False):
logger.info(
f"Disabling voicemail detection for realtime workflow run {workflow_run_id}"
)
if voicemail_config.get("enabled", False) and not is_realtime:
logger.info(f"Voicemail detection enabled for workflow run {workflow_run_id}")
# Create a separate LLM instance for the voicemail sub-pipeline
# (can't share with main pipeline as it would mess up frame linking)

View file

@ -493,6 +493,9 @@ def create_realtime_llm_service(user_config, audio_config: "AudioConfig"):
)
if provider == ServiceProviders.OPENAI_REALTIME.value:
from api.services.pipecat.realtime.openai_realtime import (
DograhOpenAIRealtimeLLMService,
)
from pipecat.services.openai.realtime.events import (
AudioConfiguration,
AudioInput,
@ -500,11 +503,10 @@ def create_realtime_llm_service(user_config, audio_config: "AudioConfig"):
InputAudioTranscription,
SessionProperties,
)
from pipecat.services.openai.realtime.llm import OpenAIRealtimeLLMService
return OpenAIRealtimeLLMService(
return DograhOpenAIRealtimeLLMService(
api_key=api_key,
settings=OpenAIRealtimeLLMService.Settings(
settings=DograhOpenAIRealtimeLLMService.Settings(
model=model,
session_properties=SessionProperties(
audio=AudioConfiguration(
@ -519,7 +521,9 @@ def create_realtime_llm_service(user_config, audio_config: "AudioConfig"):
),
)
elif provider == ServiceProviders.GOOGLE_REALTIME.value:
from pipecat.services.google.gemini_live.llm import GeminiLiveLLMService
from api.services.pipecat.realtime.gemini_live import (
DograhGeminiLiveLLMService,
)
# Gemini Live enables input/output audio transcription by default
# in its _connect() method — no need to configure it explicitly.
@ -529,9 +533,30 @@ def create_realtime_llm_service(user_config, audio_config: "AudioConfig"):
}
if language:
settings_kwargs["language"] = language
return GeminiLiveLLMService(
return DograhGeminiLiveLLMService(
api_key=api_key,
settings=GeminiLiveLLMService.Settings(**settings_kwargs),
settings=DograhGeminiLiveLLMService.Settings(**settings_kwargs),
)
elif provider == ServiceProviders.GOOGLE_VERTEX_REALTIME.value:
from api.services.pipecat.realtime.gemini_live_vertex import (
DograhGeminiLiveVertexLLMService,
)
project_id = getattr(realtime_config, "project_id", None)
location = getattr(realtime_config, "location", None) or "us-east4"
credentials = getattr(realtime_config, "credentials", None)
settings_kwargs = {
"model": model,
"voice": voice or "Charon",
}
if language:
settings_kwargs["language"] = language
return DograhGeminiLiveVertexLLMService(
credentials=credentials,
project_id=project_id,
location=location,
settings=DograhGeminiLiveVertexLLMService.Settings(**settings_kwargs),
)
else:
raise HTTPException(

View file

@ -0,0 +1,25 @@
"""Shared helpers for tuning pipecat ``TransportParams`` per run mode.
These live outside ``transport_setup.py`` (which is non-telephony only) so
that both the WebRTC factory there and the telephony provider factories
under ``api.services.telephony.providers/<name>/transport.py`` can call
into the same place.
"""
# Realtime (speech-to-speech) LLMs don't emit ``TTSStoppedFrame``, so the
# bot-stopped-speaking signal relies on the output-queue-drained fallback.
# The default 3s tail leaves a long gap before the assistant aggregator
# closes its turn; 0.5s keeps the conversation snappy without cutting into
# the bot's own audio (audio chunks arrive far more frequently than this).
REALTIME_BOT_VAD_STOP_SECS = 0.5
def realtime_param_overrides(is_realtime: bool) -> dict:
"""Return kwargs to splat into ``TransportParams`` for the given run mode.
Currently this only tunes ``bot_vad_stop_secs``; new realtime-specific
knobs should be added here so each transport stays a thin shim.
"""
if not is_realtime:
return {}
return {"bot_vad_stop_secs": REALTIME_BOT_VAD_STOP_SECS}

View file

@ -6,6 +6,7 @@ This module hosts only the shared, non-telephony transports (WebRTC, internal/Lo
from api.services.pipecat.audio_config import AudioConfig
from api.services.pipecat.audio_mixer import build_audio_out_mixer
from api.services.pipecat.transport_params import realtime_param_overrides
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection
from pipecat.transports.smallwebrtc.transport import SmallWebRTCTransport
@ -16,6 +17,7 @@ async def create_webrtc_transport(
workflow_run_id: int,
audio_config: AudioConfig,
ambient_noise_config: dict | None = None,
is_realtime: bool = False,
):
"""Create a transport for WebRTC connections."""
mixer = await build_audio_out_mixer(
@ -30,6 +32,7 @@ async def create_webrtc_transport(
audio_in_sample_rate=audio_config.transport_in_sample_rate,
audio_out_sample_rate=audio_config.transport_out_sample_rate,
audio_out_mixer=mixer,
**realtime_param_overrides(is_realtime),
),
)

View file

@ -8,6 +8,7 @@ from pipecat.transports.websocket.fastapi import (
from api.services.pipecat.audio_config import AudioConfig
from api.services.pipecat.audio_mixer import build_audio_out_mixer
from api.services.pipecat.transport_params import realtime_param_overrides
from api.services.telephony.factory import load_credentials_for_transport
from .serializers import AsteriskFrameSerializer
@ -22,6 +23,7 @@ async def create_transport(
*,
ambient_noise_config: dict | None = None,
telephony_configuration_id: int | None = None,
is_realtime: bool = False,
channel_id: str,
):
"""Create a transport for Asterisk ARI connections."""
@ -65,5 +67,6 @@ async def create_transport(
audio_out_sample_rate=audio_config.transport_out_sample_rate,
audio_out_mixer=mixer,
serializer=serializer,
**realtime_param_overrides(is_realtime),
),
)

View file

@ -8,6 +8,7 @@ from pipecat.transports.websocket.fastapi import (
from api.services.pipecat.audio_config import AudioConfig
from api.services.pipecat.audio_mixer import build_audio_out_mixer
from api.services.pipecat.transport_params import realtime_param_overrides
from api.services.telephony.factory import load_credentials_for_transport
from .serializers import CloudonixFrameSerializer
@ -22,6 +23,7 @@ async def create_transport(
*,
ambient_noise_config: dict | None = None,
telephony_configuration_id: int | None = None,
is_realtime: bool = False,
call_id: str,
stream_sid: str,
bearer_token: str | None = None,
@ -69,5 +71,6 @@ async def create_transport(
audio_out_mixer=mixer,
serializer=serializer,
audio_out_10ms_chunks=2,
**realtime_param_overrides(is_realtime),
),
)

View file

@ -8,6 +8,7 @@ from pipecat.transports.websocket.fastapi import (
from api.services.pipecat.audio_config import AudioConfig
from api.services.pipecat.audio_mixer import build_audio_out_mixer
from api.services.pipecat.transport_params import realtime_param_overrides
from api.services.telephony.factory import load_credentials_for_transport
from .serializers import PlivoFrameSerializer
@ -21,6 +22,7 @@ async def create_transport(
*,
ambient_noise_config: dict | None = None,
telephony_configuration_id: int | None = None,
is_realtime: bool = False,
stream_id: str,
call_id: str,
):
@ -61,5 +63,6 @@ async def create_transport(
audio_out_sample_rate=audio_config.transport_out_sample_rate,
audio_out_mixer=mixer,
serializer=serializer,
**realtime_param_overrides(is_realtime),
),
)

View file

@ -8,6 +8,7 @@ from pipecat.transports.websocket.fastapi import (
from api.services.pipecat.audio_config import AudioConfig
from api.services.pipecat.audio_mixer import build_audio_out_mixer
from api.services.pipecat.transport_params import realtime_param_overrides
from api.services.telephony.factory import load_credentials_for_transport
from .serializers import TelnyxFrameSerializer
@ -22,6 +23,7 @@ async def create_transport(
*,
ambient_noise_config: dict | None = None,
telephony_configuration_id: int | None = None,
is_realtime: bool = False,
stream_id: str,
call_control_id: str,
encoding: str = "PCMU",
@ -64,5 +66,6 @@ async def create_transport(
audio_out_sample_rate=audio_config.transport_out_sample_rate,
audio_out_mixer=mixer,
serializer=serializer,
**realtime_param_overrides(is_realtime),
),
)

View file

@ -8,6 +8,7 @@ from pipecat.transports.websocket.fastapi import (
from api.services.pipecat.audio_config import AudioConfig
from api.services.pipecat.audio_mixer import build_audio_out_mixer
from api.services.pipecat.transport_params import realtime_param_overrides
from api.services.telephony.factory import load_credentials_for_transport
from .serializers import TwilioFrameSerializer
@ -22,6 +23,7 @@ async def create_transport(
*,
ambient_noise_config: dict | None = None,
telephony_configuration_id: int | None = None,
is_realtime: bool = False,
stream_sid: str,
call_sid: str,
):
@ -60,5 +62,6 @@ async def create_transport(
audio_out_sample_rate=audio_config.transport_out_sample_rate,
audio_out_mixer=mixer,
serializer=serializer,
**realtime_param_overrides(is_realtime),
),
)

View file

@ -14,6 +14,7 @@ from pipecat.transports.websocket.fastapi import (
from api.services.pipecat.audio_config import AudioConfig
from api.services.pipecat.audio_mixer import build_audio_out_mixer
from api.services.pipecat.transport_params import realtime_param_overrides
from api.services.telephony.factory import load_credentials_for_transport
from .serializers import VobizFrameSerializer
@ -27,6 +28,7 @@ async def create_transport(
*,
ambient_noise_config: dict | None = None,
telephony_configuration_id: int | None = None,
is_realtime: bool = False,
stream_id: str,
call_id: str,
):
@ -72,6 +74,7 @@ async def create_transport(
audio_out_sample_rate=audio_config.transport_out_sample_rate,
audio_out_mixer=mixer,
serializer=serializer,
**realtime_param_overrides(is_realtime),
),
)

View file

@ -7,6 +7,7 @@ from pipecat.transports.websocket.fastapi import (
from api.services.pipecat.audio_config import AudioConfig
from api.services.pipecat.audio_mixer import build_audio_out_mixer
from api.services.pipecat.transport_params import realtime_param_overrides
from api.services.telephony.factory import load_credentials_for_transport
from .serializers import VonageFrameSerializer
@ -20,6 +21,7 @@ async def create_transport(
*,
ambient_noise_config: dict | None = None,
telephony_configuration_id: int | None = None,
is_realtime: bool = False,
call_uuid: str,
):
"""Create a transport for Vonage connections."""
@ -59,5 +61,6 @@ async def create_transport(
audio_out_sample_rate=audio_config.transport_out_sample_rate,
audio_out_mixer=mixer,
serializer=serializer,
**realtime_param_overrides(is_realtime),
),
)

View file

@ -321,6 +321,7 @@ class CustomToolManager:
tool=tool,
arguments=function_call_params.arguments,
call_context_vars=self._engine._call_context_vars,
gathered_context_vars=self._engine._gathered_context,
organization_id=await self.get_organization_id(),
)

View file

@ -1,5 +1,6 @@
"""Custom tool execution for user-defined HTTP API tools."""
import json
import re
from typing import Any, Dict, Optional
@ -8,6 +9,7 @@ from loguru import logger
from api.db import db_client
from api.utils.credential_auth import build_auth_header
from api.utils.template_renderer import render_template
# Map tool parameter types to JSON schema types
TYPE_MAP = {
@ -84,10 +86,94 @@ def tool_to_function_schema(tool: Any) -> Dict[str, Any]:
}
def _coerce_parameter_value(value: Any, param_type: str) -> Any:
"""Coerce a rendered preset parameter into the configured JSON type."""
if value is None:
return None
if param_type == "string":
if isinstance(value, str):
return value
if isinstance(value, (dict, list)):
return json.dumps(value)
return str(value)
if param_type == "number":
if isinstance(value, (int, float)) and not isinstance(value, bool):
return value
rendered = str(value).strip()
if rendered == "":
return None
if re.fullmatch(r"[-+]?\d+", rendered):
return int(rendered)
return float(rendered)
if param_type == "boolean":
if isinstance(value, bool):
return value
if isinstance(value, (int, float)):
return bool(value)
rendered = str(value).strip().lower()
if rendered in {"true", "1", "yes", "y", "on"}:
return True
if rendered in {"false", "0", "no", "n", "off"}:
return False
raise ValueError(f"Cannot convert '{value}' to boolean")
return value
def _resolve_preset_parameters(
config: Dict[str, Any],
call_context_vars: Optional[Dict[str, Any]],
gathered_context_vars: Optional[Dict[str, Any]],
) -> Dict[str, Any]:
"""Resolve fixed/template-backed parameters before executing the HTTP request."""
preset_parameters = config.get("preset_parameters", []) or []
if not preset_parameters:
return {}
initial_context = dict(call_context_vars or {})
render_context: Dict[str, Any] = {
**initial_context,
"initial_context": initial_context,
"gathered_context": dict(gathered_context_vars or {}),
}
resolved: Dict[str, Any] = {}
for param in preset_parameters:
param_name = (param.get("name") or "").strip()
if not param_name:
continue
rendered = render_template(param.get("value_template", ""), render_context)
if rendered in (None, ""):
if param.get("required", True):
raise ValueError(
f"Preset parameter '{param_name}' resolved to an empty value"
)
continue
resolved[param_name] = _coerce_parameter_value(
rendered, param.get("type", "string")
)
return resolved
async def execute_http_tool(
tool: Any,
arguments: Dict[str, Any],
call_context_vars: Optional[Dict[str, Any]] = None,
gathered_context_vars: Optional[Dict[str, Any]] = None,
organization_id: Optional[int] = None,
) -> Dict[str, Any]:
"""Execute an HTTP API tool.
@ -95,7 +181,8 @@ async def execute_http_tool(
Args:
tool: ToolModel instance
arguments: Arguments passed by the LLM (parameter name -> value)
call_context_vars: Additional context variables from the call (unused for now)
call_context_vars: Initial context variables available at runtime
gathered_context_vars: Variables extracted during the conversation
organization_id: Organization ID for credential lookup
Returns:
@ -133,17 +220,31 @@ async def execute_http_tool(
timeout_ms = config.get("timeout_ms", 5000)
timeout_seconds = timeout_ms / 1000
try:
preset_arguments = _resolve_preset_parameters(
config, call_context_vars, gathered_context_vars
)
except ValueError as e:
logger.error(f"Custom tool '{tool.name}' preset parameter error: {e}")
return {"status": "error", "error": str(e)}
resolved_arguments = {**(arguments or {}), **preset_arguments}
# Build request: JSON body for POST/PUT/PATCH, query params for GET/DELETE
body = None
params = None
if method in ("POST", "PUT", "PATCH"):
body = arguments
elif method in ("GET", "DELETE") and arguments:
params = arguments
body = resolved_arguments
elif method in ("GET", "DELETE") and resolved_arguments:
params = resolved_arguments
logger.info(
f"Executing custom tool '{tool.name}' ({tool.tool_uuid}): {method} {url}"
)
if preset_arguments:
logger.debug(
f"Resolved preset parameters for '{tool.name}': {list(preset_arguments.keys())}"
)
logger.debug(f"Request body: {body}, params: {params}")
try:

View file

@ -140,6 +140,45 @@ class TestToolToFunctionSchema:
assert "duration_minutes" in required
assert "is_priority" not in required
def test_preset_parameters_are_not_exposed_to_llm_schema(self):
"""Test that preset parameters are injected at runtime, not shown to the LLM."""
tool = MockToolModel(
tool_uuid="test-uuid-preset",
name="Lookup Customer",
description="Lookup a customer using contextual identifiers",
category="http_api",
definition={
"schema_version": 1,
"type": "http_api",
"config": {
"method": "POST",
"url": "https://api.example.com/customers/lookup",
"parameters": [
{
"name": "customer_name",
"type": "string",
"description": "Customer name spoken by the caller",
"required": True,
}
],
"preset_parameters": [
{
"name": "phone_number",
"type": "string",
"value_template": "{{initial_context.phone_number}}",
"required": True,
}
],
},
},
)
schema = tool_to_function_schema(tool)
props = schema["function"]["parameters"]["properties"]
assert "customer_name" in props
assert "phone_number" not in props
def test_tool_name_sanitization(self):
"""Test that tool names with special characters are sanitized."""
tool = MockToolModel(
@ -255,6 +294,108 @@ class TestExecuteHttpTool:
assert result["status_code"] == 201
assert result["data"]["id"] == 123
@pytest.mark.asyncio
async def test_post_request_injects_preset_parameters(self):
"""Test that preset parameters are resolved from runtime context."""
tool = MockToolModel(
tool_uuid="test-uuid-preset",
name="Create Lead",
description="Create a lead with caller context",
category="http_api",
definition={
"schema_version": 1,
"type": "http_api",
"config": {
"method": "POST",
"url": "https://api.example.com/leads",
"timeout_ms": 5000,
"preset_parameters": [
{
"name": "phone_number",
"type": "string",
"value_template": "{{initial_context.phone_number}}",
"required": True,
},
{
"name": "customer_id",
"type": "number",
"value_template": "{{gathered_context.customer_id}}",
"required": True,
},
{
"name": "is_vip",
"type": "boolean",
"value_template": "{{initial_context.is_vip}}",
"required": False,
},
],
},
},
)
arguments = {"name": "John"}
with patch(
"api.services.workflow.tools.custom_tool.httpx.AsyncClient"
) as mock_client_class:
mock_client = AsyncMock()
mock_response = Mock()
mock_response.status_code = 201
mock_response.json.return_value = {"id": 123}
mock_client.request.return_value = mock_response
mock_client_class.return_value.__aenter__.return_value = mock_client
result = await execute_http_tool(
tool,
arguments,
call_context_vars={
"phone_number": "+14155550123",
"is_vip": "true",
},
gathered_context_vars={"customer_id": "42"},
)
call_kwargs = mock_client.request.call_args.kwargs
assert call_kwargs["json"] == {
"name": "John",
"phone_number": "+14155550123",
"customer_id": 42,
"is_vip": True,
}
assert result["status"] == "success"
@pytest.mark.asyncio
async def test_missing_required_preset_parameter_returns_error(self):
"""Test that required preset parameters fail before the HTTP request."""
tool = MockToolModel(
tool_uuid="test-uuid-preset-error",
name="Create Lead",
description="Create a lead with caller context",
category="http_api",
definition={
"schema_version": 1,
"type": "http_api",
"config": {
"method": "POST",
"url": "https://api.example.com/leads",
"timeout_ms": 5000,
"preset_parameters": [
{
"name": "phone_number",
"type": "string",
"value_template": "{{initial_context.phone_number}}",
"required": True,
}
],
},
},
)
result = await execute_http_tool(tool, {"name": "John"}, call_context_vars={})
assert result["status"] == "error"
assert "phone_number" in result["error"]
@pytest.mark.asyncio
async def test_get_request_sends_query_params(self):
"""Test that GET requests send arguments as query parameters."""

View file

@ -0,0 +1,86 @@
import json
from types import SimpleNamespace
from unittest.mock import AsyncMock
import pytest
from pipecat.processors.aggregators.llm_context import LLMContext
from api.services.pipecat.realtime.gemini_live import DograhGeminiLiveLLMService
class _TestDograhGeminiLiveLLMService(DograhGeminiLiveLLMService):
"""Dograh Gemini service with client creation stubbed for unit tests."""
def create_client(self):
self._client = SimpleNamespace(
aio=SimpleNamespace(live=SimpleNamespace(connect=None))
)
class _FakeSession:
def __init__(self):
self.send_tool_response = AsyncMock()
self.send_realtime_input = AsyncMock()
self.close = AsyncMock()
def _make_service() -> _TestDograhGeminiLiveLLMService:
service = _TestDograhGeminiLiveLLMService(api_key="test-key")
service.stop_all_metrics = AsyncMock()
service.start_ttfb_metrics = AsyncMock()
service.cancel_task = AsyncMock()
service.push_error = AsyncMock()
return service
def _make_tool_result_context(tool_call_id: str) -> LLMContext:
return LLMContext(
messages=[
{
"role": "tool",
"content": json.dumps({"status": "done"}),
"tool_call_id": tool_call_id,
}
]
)
@pytest.mark.asyncio
async def test_updated_context_during_reconnect_keeps_result_pending_until_session_ready():
service = _make_service()
service._handled_initial_context = True
service._tool_call_id_to_name = {"call-transition": "transition_to_next_node"}
service._session = _FakeSession()
context = _make_tool_result_context("call-transition")
await service._disconnect()
await service._handle_context(context)
# A reconnect gap should not count as successful delivery to Gemini.
assert "call-transition" not in service._completed_tool_calls
session = _FakeSession()
await service._handle_session_ready(session)
session.send_tool_response.assert_awaited_once()
sent_response = session.send_tool_response.await_args.kwargs["function_responses"]
assert sent_response.id == "call-transition"
assert sent_response.name == "transition_to_next_node"
assert "call-transition" in service._completed_tool_calls
@pytest.mark.asyncio
async def test_disconnect_does_not_forget_previously_delivered_tool_results():
service = _make_service()
service._context = _make_tool_result_context("call-transition")
service._completed_tool_calls = {"call-transition"}
service._tool_call_id_to_name = {"call-transition": "transition_to_next_node"}
service._session = _FakeSession()
service._tool_result = AsyncMock()
await service._disconnect()
await service._process_completed_function_calls(send_new_results=True)
service._tool_result.assert_not_awaited()
assert service._completed_tool_calls == {"call-transition"}

View file

@ -0,0 +1,98 @@
from types import SimpleNamespace
from unittest.mock import AsyncMock
import pytest
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.frame_processor import FrameDirection
from api.services.pipecat.realtime.openai_realtime import (
DograhOpenAIRealtimeLLMService,
)
def _make_service() -> DograhOpenAIRealtimeLLMService:
service = DograhOpenAIRealtimeLLMService(api_key="test-key")
service._create_response = AsyncMock()
service._process_completed_function_calls = AsyncMock()
return service
@pytest.mark.asyncio
async def test_initial_context_triggers_response_when_context_was_prepopulated():
service = _make_service()
context = LLMContext()
service._context = context
await service._handle_context(context)
assert service._handled_initial_context is True
assert service._context is context
service._create_response.assert_awaited_once()
service._process_completed_function_calls.assert_not_awaited()
@pytest.mark.asyncio
async def test_updated_context_uses_tool_result_path_after_initial_context():
service = _make_service()
context = LLMContext()
service._handled_initial_context = True
await service._handle_context(context)
assert service._context is context
service._create_response.assert_not_awaited()
service._process_completed_function_calls.assert_awaited_once_with(
send_new_results=True
)
@pytest.mark.asyncio
async def test_tts_greeting_uses_initial_context_handler():
service = _make_service()
service._context = LLMContext()
service._handle_context = AsyncMock()
await service.process_frame(
TTSSpeakFrame("hello", append_to_context=True),
FrameDirection.DOWNSTREAM,
)
service._handle_context.assert_awaited_once_with(service._context)
service._create_response.assert_not_awaited()
@pytest.mark.asyncio
async def test_function_call_executes_immediately_when_bot_is_not_speaking():
service = _make_service()
service._context = LLMContext()
service.run_function_calls = AsyncMock()
service._pending_function_calls["call-1"] = SimpleNamespace(name="customer_support")
await service._handle_evt_function_call_arguments_done(
SimpleNamespace(call_id="call-1", arguments='{"department":"sales"}')
)
service.run_function_calls.assert_awaited_once()
assert service._deferred_function_calls == []
@pytest.mark.asyncio
async def test_function_call_is_deferred_until_bot_stops_speaking():
service = _make_service()
service._context = LLMContext()
service.run_function_calls = AsyncMock()
service._bot_is_speaking = True
service._pending_function_calls["call-1"] = SimpleNamespace(name="customer_support")
await service._handle_evt_function_call_arguments_done(
SimpleNamespace(call_id="call-1", arguments='{"department":"sales"}')
)
service.run_function_calls.assert_not_awaited()
assert len(service._deferred_function_calls) == 1
await service._run_pending_function_calls()
service.run_function_calls.assert_awaited_once()
assert service._deferred_function_calls == []

View file

@ -0,0 +1,61 @@
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.turns.user_start import (
ExternalUserTurnStartStrategy,
)
from pipecat.turns.user_start.vad_user_turn_start_strategy import (
VADUserTurnStartStrategy,
)
from pipecat.turns.user_stop import (
ExternalUserTurnStopStrategy,
SpeechTimeoutUserTurnStopStrategy,
)
from api.services.configuration.registry import ServiceProviders
from api.services.pipecat.run_pipeline import _create_realtime_user_turn_config
def test_gemini_realtime_uses_local_vad_without_local_interruptions():
strategies, vad_analyzer = _create_realtime_user_turn_config(
ServiceProviders.GOOGLE_REALTIME.value
)
assert isinstance(vad_analyzer, SileroVADAnalyzer)
assert len(strategies.start) == 1
assert isinstance(strategies.start[0], VADUserTurnStartStrategy)
assert strategies.start[0]._enable_interruptions is False
assert len(strategies.stop) == 1
assert isinstance(strategies.stop[0], SpeechTimeoutUserTurnStopStrategy)
def test_gemini_vertex_realtime_uses_same_turn_config_as_gemini_live():
strategies, vad_analyzer = _create_realtime_user_turn_config(
ServiceProviders.GOOGLE_VERTEX_REALTIME.value
)
assert isinstance(vad_analyzer, SileroVADAnalyzer)
assert len(strategies.start) == 1
assert isinstance(strategies.start[0], VADUserTurnStartStrategy)
assert strategies.start[0]._enable_interruptions is False
def test_openai_realtime_uses_provider_turn_frames_without_local_vad():
strategies, vad_analyzer = _create_realtime_user_turn_config(
ServiceProviders.OPENAI_REALTIME.value
)
assert vad_analyzer is None
assert len(strategies.start) == 1
assert isinstance(strategies.start[0], ExternalUserTurnStartStrategy)
assert strategies.start[0]._enable_interruptions is False
assert len(strategies.stop) == 1
assert isinstance(strategies.stop[0], ExternalUserTurnStopStrategy)
def test_unknown_realtime_providers_keep_local_vad():
strategies, vad_analyzer = _create_realtime_user_turn_config("other_realtime")
assert isinstance(vad_analyzer, SileroVADAnalyzer)
assert len(strategies.start) == 1
assert isinstance(strategies.start[0], VADUserTurnStartStrategy)
assert len(strategies.stop) == 1
assert isinstance(strategies.stop[0], SpeechTimeoutUserTurnStopStrategy)

View file

@ -66,7 +66,7 @@ class TestUnregisteredFunctionCall:
# Pipecat's missing-function handler returns a string error.
assert isinstance(result_frame.result, str)
assert "not registered" in result_frame.result
assert "not currently available" in result_frame.result
assert "nonexistent_tool" in result_frame.result
# In-progress frame should also be emitted before the result so mute

File diff suppressed because one or more lines are too long

View file

@ -4,4 +4,6 @@ set -euo pipefail
ruff check api --select I --select F401 --fix
ruff format api
ruff format pipecat
(cd ui && npm run fix-lint)

View file

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: dograh-openapi-XXXXXX.json.BPTCZt07wQ
# timestamp: 2026-05-13T16:22:23+00:00
# filename: dograh-openapi-XXXXXX.json.R5pvM3g7EK
# timestamp: 2026-05-16T03:12:06+00:00
from __future__ import annotations

View file

@ -11,6 +11,8 @@ import {
KeyValueEditor,
type KeyValueItem,
ParameterEditor,
PresetParameterEditor,
type PresetToolParameter,
type ToolParameter,
UrlInput,
} from "@/components/http";
@ -35,6 +37,8 @@ export interface HttpApiToolConfigProps {
onHeadersChange: (headers: KeyValueItem[]) => void;
parameters: ToolParameter[];
onParametersChange: (parameters: ToolParameter[]) => void;
presetParameters: PresetToolParameter[];
onPresetParametersChange: (parameters: PresetToolParameter[]) => void;
timeoutMs: number;
onTimeoutMsChange: (timeout: number) => void;
customMessage: string;
@ -61,6 +65,8 @@ export function HttpApiToolConfig({
onHeadersChange,
parameters,
onParametersChange,
presetParameters,
onPresetParametersChange,
timeoutMs,
onTimeoutMsChange,
customMessage,
@ -182,7 +188,7 @@ export function HttpApiToolConfig({
<TabsContent value="parameters" className="space-y-4 mt-4">
<div className="grid gap-2">
<Label>Tool Parameters</Label>
<Label>LLM Parameters</Label>
<Label className="text-xs text-muted-foreground">
Define the parameters that the LLM will provide when calling this tool.
These will be sent as JSON body for POST/PUT/PATCH or as URL query params for GET/DELETE.
@ -193,6 +199,18 @@ export function HttpApiToolConfig({
/>
</div>
<div className="grid gap-2 pt-4 border-t">
<Label>Preset Parameters</Label>
<Label className="text-xs text-muted-foreground">
Add values that Dograh should inject at runtime. These are not exposed to the LLM and can use
workflow templates like {`{{initial_context.phone_number}}`} or fixed literals.
</Label>
<PresetParameterEditor
parameters={presetParameters}
onChange={onPresetParametersChange}
/>
</div>
<div className="grid gap-2 pt-4 border-t">
<Label>Custom Headers</Label>
<Label className="text-xs text-muted-foreground">

View file

@ -11,7 +11,13 @@ import {
} from "@/client/sdk.gen";
import type { RecordingResponseSchema, ToolResponse, TransferCallConfig as APITransferCallConfig } from "@/client/types.gen";
import type { EndCallConfig } from "@/client/types.gen";
import { type HttpMethod, type KeyValueItem, type ToolParameter, validateUrl } from "@/components/http";
import {
type HttpMethod,
type KeyValueItem,
type PresetToolParameter,
type ToolParameter,
validateUrl,
} from "@/components/http";
import { Button } from "@/components/ui/button";
import {
Dialog,
@ -41,6 +47,12 @@ interface HttpApiConfigWithParams {
headers?: Record<string, string>;
credential_uuid?: string;
parameters?: ToolParameter[];
preset_parameters?: Array<{
name?: string;
type?: PresetToolParameter["type"];
value_template?: string;
required?: boolean;
}>;
timeout_ms?: number;
customMessage?: string;
}
@ -70,6 +82,7 @@ export default function ToolDetailPage() {
const [credentialUuid, setCredentialUuid] = useState("");
const [headers, setHeaders] = useState<KeyValueItem[]>([]);
const [parameters, setParameters] = useState<ToolParameter[]>([]);
const [presetParameters, setPresetParameters] = useState<PresetToolParameter[]>([]);
const [timeoutMs, setTimeoutMs] = useState(5000);
// End Call form state
@ -209,6 +222,19 @@ export default function ToolDetailPage() {
} else {
setParameters([]);
}
if (config.preset_parameters && Array.isArray(config.preset_parameters)) {
setPresetParameters(
config.preset_parameters.map((p) => ({
name: p.name || "",
type: p.type || "string",
valueTemplate: p.value_template || "",
required: p.required ?? true,
}))
);
} else {
setPresetParameters([]);
}
}
}
};
@ -263,6 +289,14 @@ export default function ToolDetailPage() {
setError("All parameters must have a name");
return;
}
const invalidPresetParams = presetParameters.filter(
(p) => !p.name.trim() || !p.valueTemplate.trim()
);
if (invalidPresetParams.length > 0) {
setError("All preset parameters must have a name and a value");
return;
}
}
try {
@ -325,6 +359,9 @@ export default function ToolDetailPage() {
});
const validParameters = parameters.filter((p) => p.name.trim());
const validPresetParameters = presetParameters.filter(
(p) => p.name.trim() && p.valueTemplate.trim()
);
requestBody = {
name,
@ -342,6 +379,15 @@ export default function ToolDetailPage() {
: undefined,
parameters:
validParameters.length > 0 ? validParameters : undefined,
preset_parameters:
validPresetParameters.length > 0
? validPresetParameters.map((p) => ({
name: p.name,
type: p.type,
value_template: p.valueTemplate,
required: p.required,
}))
: undefined,
timeout_ms: timeoutMs,
customMessage: customMessageType === 'text' ? (customMessage || undefined) : undefined,
customMessageType,
@ -394,8 +440,20 @@ export default function ToolDetailPage() {
exampleBody[p.name] = `<${p.name}>`;
}
});
presetParameters.forEach((p) => {
if (p.type === "number") {
exampleBody[p.name] = p.valueTemplate || 0;
} else if (p.type === "boolean") {
exampleBody[p.name] = p.valueTemplate || true;
} else {
exampleBody[p.name] = p.valueTemplate || `<${p.name}>`;
}
});
const hasBody = httpMethod !== "GET" && httpMethod !== "DELETE" && parameters.length > 0;
const hasBody =
httpMethod !== "GET" &&
httpMethod !== "DELETE" &&
(parameters.length > 0 || presetParameters.length > 0);
return `// ${tool.name}
// ${tool.description || "HTTP API Tool"}
@ -571,6 +629,8 @@ const data = await response.json();`;
onHeadersChange={setHeaders}
parameters={parameters}
onParametersChange={setParameters}
presetParameters={presetParameters}
onPresetParametersChange={setPresetParameters}
timeoutMs={timeoutMs}
onTimeoutMsChange={setTimeoutMs}
customMessage={customMessage}

File diff suppressed because one or more lines are too long

View file

@ -1860,6 +1860,12 @@ export type HttpApiConfig = {
* Parameters that the tool accepts from LLM
*/
parameters?: Array<ToolParameter> | null;
/**
* Preset Parameters
*
* Parameters injected by Dograh from fixed values or workflow context templates
*/
preset_parameters?: Array<PresetToolParameter> | null;
/**
* Timeout Ms
*
@ -2480,6 +2486,38 @@ export type PlivoConfigurationResponse = {
from_numbers: Array<string>;
};
/**
* PresetToolParameter
*
* A parameter injected by Dograh at runtime.
*/
export type PresetToolParameter = {
/**
* Name
*
* Parameter name (used as key in request body)
*/
name: string;
/**
* Type
*
* Parameter type: string, number, or boolean
*/
type: string;
/**
* Value Template
*
* Fixed value or template, e.g. {{initial_context.phone_number}}
*/
value_template: string;
/**
* Required
*
* Whether the parameter must resolve to a non-empty value
*/
required?: boolean;
};
/**
* PresignedUploadUrlRequest
*/

View file

@ -2,5 +2,11 @@ export { CreateCredentialDialog } from "./create-credential-dialog";
export { CredentialSelector } from "./credential-selector";
export { type HttpMethod, HttpMethodSelector } from "./http-method-selector";
export { KeyValueEditor, type KeyValueItem } from "./key-value-editor";
export { ParameterEditor, type ParameterType,type ToolParameter } from "./parameter-editor";
export { UrlInput, type UrlValidationResult,validateUrl } from "./url-input";
export {
ParameterEditor,
type ParameterType,
PresetParameterEditor,
type PresetToolParameter,
type ToolParameter,
} from "./parameter-editor";
export { UrlInput, type UrlValidationResult, validateUrl } from "./url-input";

View file

@ -23,6 +23,13 @@ export interface ToolParameter {
required: boolean;
}
export interface PresetToolParameter {
name: string;
type: ParameterType;
valueTemplate: string;
required: boolean;
}
interface ParameterEditorProps {
parameters: ToolParameter[];
onChange: (parameters: ToolParameter[]) => void;
@ -165,3 +172,146 @@ export function ParameterEditor({
</div>
);
}
interface PresetParameterEditorProps {
parameters: PresetToolParameter[];
onChange: (parameters: PresetToolParameter[]) => void;
disabled?: boolean;
}
export function PresetParameterEditor({
parameters,
onChange,
disabled = false,
}: PresetParameterEditorProps) {
const addParameter = () => {
onChange([
...parameters,
{ name: "", type: "string", valueTemplate: "", required: true },
]);
};
const updateParameter = (
index: number,
field: keyof PresetToolParameter,
value: string | boolean
) => {
const newParams = [...parameters];
newParams[index] = { ...newParams[index], [field]: value };
onChange(newParams);
};
const removeParameter = (index: number) => {
onChange(parameters.filter((_, i) => i !== index));
};
return (
<div className="space-y-4">
{parameters.length === 0 && (
<div className="text-sm text-muted-foreground py-4 text-center border border-dashed rounded-md">
No preset parameters defined. Add one to inject a fixed value or workflow context into the request.
</div>
)}
{parameters.map((param, index) => (
<div
key={index}
className="border rounded-lg p-4 space-y-3 bg-muted/20"
>
<div className="flex items-center justify-between">
<span className="text-sm font-medium text-muted-foreground">
Preset Parameter {index + 1}
</span>
<Button
variant="ghost"
size="icon"
onClick={() => removeParameter(index)}
disabled={disabled}
className="h-8 w-8"
>
<Trash2Icon className="h-4 w-4 text-muted-foreground hover:text-destructive" />
</Button>
</div>
<div className="grid grid-cols-2 gap-3">
<div className="space-y-1.5">
<Label className="text-xs">Name</Label>
<Label className="text-xs text-muted-foreground">
Key sent to the API, like &quot;phone_number&quot; or &quot;customer_id&quot;
</Label>
<Input
placeholder="e.g., phone_number"
value={param.name}
onChange={(e) =>
updateParameter(index, "name", e.target.value)
}
disabled={disabled}
/>
</div>
<div className="space-y-1.5">
<Label className="text-xs">Type</Label>
<Label className="text-xs text-muted-foreground">
JSON type to send to the API
</Label>
<Select
value={param.type}
onValueChange={(value: ParameterType) =>
updateParameter(index, "type", value)
}
disabled={disabled}
>
<SelectTrigger>
<SelectValue placeholder="Select type" />
</SelectTrigger>
<SelectContent>
<SelectItem value="string">String</SelectItem>
<SelectItem value="number">Number</SelectItem>
<SelectItem value="boolean">Boolean</SelectItem>
</SelectContent>
</Select>
</div>
</div>
<div className="space-y-1.5">
<Label className="text-xs">Value or Template</Label>
<Label className="text-xs text-muted-foreground">
Use a fixed value or a template like {`{{initial_context.phone_number}}`} or {`{{gathered_context.customer_id}}`}
</Label>
<Input
placeholder="e.g., {{initial_context.phone_number}}"
value={param.valueTemplate}
onChange={(e) =>
updateParameter(index, "valueTemplate", e.target.value)
}
disabled={disabled}
/>
</div>
<div className="flex items-center gap-2">
<Switch
id={`preset-required-${index}`}
checked={param.required}
onCheckedChange={(checked) =>
updateParameter(index, "required", checked)
}
disabled={disabled}
/>
<Label htmlFor={`preset-required-${index}`} className="text-sm">
Required
</Label>
</div>
</div>
))}
<Button
variant="outline"
size="sm"
onClick={addParameter}
className="w-fit"
disabled={disabled}
>
<PlusIcon className="h-4 w-4 mr-1" /> Add Preset Parameter
</Button>
</div>
);
}