feat: add message before tool calls (#185)

This commit is contained in:
Abhishek 2026-03-09 17:28:13 +05:30 committed by GitHub
parent 8b5a36e55c
commit ec58356276
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 126 additions and 19 deletions

View file

@ -96,6 +96,7 @@ class RFNodeDTO(BaseModel):
class EdgeDataDTO(BaseModel):
label: str = Field(..., min_length=1)
condition: str = Field(..., min_length=1)
transition_speech: Optional[str] = None
class RFEdgeDTO(BaseModel):

View file

@ -11,6 +11,7 @@ from pipecat.frames.frames import (
CancelFrame,
EndFrame,
FunctionCallResultProperties,
TTSSpeakFrame,
)
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
@ -94,6 +95,10 @@ class PipecatEngine:
# Controls whether user input should be muted
self._mute_pipeline: bool = False
# Mute state for queued TTSSpeakFrames (transition speech, custom tool messages)
# "idle" = not muting, "waiting" = speech queued, "playing" = bot speaking it
self._queued_speech_mute_state: str = "idle"
# Tracks whether the bot is currently speaking (for allow_interrupt logic)
self._bot_is_speaking: bool = False
@ -204,7 +209,12 @@ class PipecatEngine:
return render_template(prompt, self._call_context_vars)
async def _create_transition_func(self, name: str, transition_to_node: str):
async def _create_transition_func(
self,
name: str,
transition_to_node: str,
transition_speech: Optional[str] = None,
):
async def transition_func(function_call_params: FunctionCallParams) -> None:
"""Inner function that handles the node change tool calls"""
logger.info(f"LLM Function Call EXECUTED: {name}")
@ -217,6 +227,14 @@ class PipecatEngine:
# Perform variable extraction before transitioning to new node
await self._perform_variable_extraction_if_needed(self._current_node)
# Queue transition speech before switching nodes
if transition_speech:
logger.info(f"Playing transition speech: {transition_speech}")
self._queued_speech_mute_state = "waiting"
await self.task.queue_frame(
TTSSpeakFrame(transition_speech, append_to_context=False)
)
# Set context for the new node, so that when the function call result
# frame is received by LLMContextAggregator and an LLM generation
# is done, we have updated context and functions
@ -260,14 +278,19 @@ class PipecatEngine:
return transition_func
async def _register_transition_function_with_llm(
self, name: str, transition_to_node: str
self,
name: str,
transition_to_node: str,
transition_speech: Optional[str] = None,
):
logger.debug(
f"Registering function {name} to transition to node {transition_to_node} with LLM"
)
# Create transition function
transition_func = await self._create_transition_func(name, transition_to_node)
transition_func = await self._create_transition_func(
name, transition_to_node, transition_speech
)
# Register function with LLM
self.llm.register_function(
@ -437,7 +460,9 @@ class PipecatEngine:
if not node.is_end:
for outgoing_edge in node.out_edges:
await self._register_transition_function_with_llm(
outgoing_edge.get_function_name(), outgoing_edge.target
outgoing_edge.get_function_name(),
outgoing_edge.target,
outgoing_edge.transition_speech,
)
# Register custom tool handlers for this node
@ -655,13 +680,20 @@ class PipecatEngine:
# Track bot speaking state from frames
if isinstance(frame, BotStartedSpeakingFrame):
self._bot_is_speaking = True
if self._queued_speech_mute_state == "waiting":
self._queued_speech_mute_state = "playing"
elif isinstance(frame, BotStoppedSpeakingFrame):
self._bot_is_speaking = False
self._queued_speech_mute_state = "idle"
# Always mute if pipeline is shutting down
if self._mute_pipeline:
return True
# Mute while queued speech (transition/tool message) is pending or playing
if self._queued_speech_mute_state != "idle":
return True
# Mute if bot is speaking and current node doesn't allow interruption
if self._bot_is_speaking and self._current_node:
# If we should not allow interruption, mute the pipeline

View file

@ -189,6 +189,18 @@ class CustomToolManager:
logger.info(f"Arguments: {function_call_params.arguments}")
try:
# Queue custom message before executing the API call
config = tool.definition.get("config", {}) if tool.definition else {}
custom_message = config.get("customMessage", "")
if custom_message:
logger.info(
f"Playing custom message before HTTP tool: {custom_message}"
)
self._engine._queued_speech_mute_state = "waiting"
await self._engine.task.queue_frame(
TTSSpeakFrame(custom_message, append_to_context=False)
)
result = await execute_http_tool(
tool=tool,
arguments=function_call_params.arguments,
@ -373,6 +385,7 @@ class CustomToolManager:
if message_type == "custom" and custom_message:
logger.info(f"Playing pre-transfer message: {custom_message}")
self._engine._queued_speech_mute_state = "waiting"
await self._engine.task.queue_frame(TTSSpeakFrame(custom_message))
# Get organization ID for provider configuration

View file

@ -13,6 +13,7 @@ class Edge:
self.label = data.label
self.condition = data.condition
self.transition_speech = data.transition_speech
self.data = data