feat: Add end call reason in tool calls.

This commit is contained in:
Abhishek Kumar 2026-02-21 14:21:39 +05:30
parent e111cbb36d
commit 7e2de092ae
13 changed files with 391 additions and 182 deletions

View file

@ -55,6 +55,16 @@ class EndCallConfig(BaseModel):
customMessage: Optional[str] = Field( customMessage: Optional[str] = Field(
default=None, description="Custom message to play before ending the call" default=None, description="Custom message to play before ending the call"
) )
endCallReason: bool = Field(
default=False,
description="When enabled, LLM must provide a reason for ending the call. "
"The reason is set as call disposition and added to call tags.",
)
endCallReasonDescription: Optional[str] = Field(
default=None,
description="Description shown to the LLM for the reason parameter. "
"Used only when endCallReason is enabled.",
)
class TransferCallConfig(BaseModel): class TransferCallConfig(BaseModel):

View file

@ -7,7 +7,6 @@ from api.services.pipecat.audio_config import AudioConfig
from api.services.pipecat.in_memory_buffers import ( from api.services.pipecat.in_memory_buffers import (
InMemoryAudioBuffer, InMemoryAudioBuffer,
InMemoryLogsBuffer, InMemoryLogsBuffer,
InMemoryTranscriptBuffer,
) )
from api.services.pipecat.pipeline_metrics_aggregator import PipelineMetricsAggregator from api.services.pipecat.pipeline_metrics_aggregator import PipelineMetricsAggregator
from api.services.workflow.pipecat_engine import PipecatEngine from api.services.workflow.pipecat_engine import PipecatEngine
@ -32,7 +31,7 @@ def register_event_handlers(
"""Register all event handlers for transport and task events. """Register all event handlers for transport and task events.
Returns: Returns:
Tuple of (in_memory_audio_buffer, in_memory_transcript_buffer) for use by other handlers. in_memory_audio_buffer for use by other handlers.
""" """
# Initialize in-memory buffers with proper audio configuration # Initialize in-memory buffers with proper audio configuration
sample_rate = audio_config.pipeline_sample_rate if audio_config else 16000 sample_rate = audio_config.pipeline_sample_rate if audio_config else 16000
@ -48,8 +47,6 @@ def register_event_handlers(
sample_rate=sample_rate, sample_rate=sample_rate,
num_channels=num_channels, num_channels=num_channels,
) )
in_memory_transcript_buffer = InMemoryTranscriptBuffer(workflow_run_id)
# Track both events to ensure LLM is only triggered after both occur # Track both events to ensure LLM is only triggered after both occur
ready_state = { ready_state = {
"pipeline_started": False, "pipeline_started": False,
@ -123,23 +120,22 @@ def register_event_handlers(
gathered_context = {**gathered_context, **workflow_run.gathered_context} gathered_context = {**gathered_context, **workflow_run.gathered_context}
# Set user_speech call tag # Set user_speech call tag
if in_memory_transcript_buffer: call_tags = gathered_context.get("call_tags", [])
call_tags = gathered_context.get("call_tags", [])
try: try:
has_user_speech = in_memory_transcript_buffer.contains_user_speech() has_user_speech = in_memory_logs_buffer.contains_user_speech()
except Exception: except Exception:
has_user_speech = False has_user_speech = False
if has_user_speech and "user_speech" not in call_tags: if has_user_speech and "user_speech" not in call_tags:
call_tags.append("user_speech") call_tags.append("user_speech")
# Append any keys from gathered_context that start with 'tag_' to call_tags # Append any keys from gathered_context that start with 'tag_' to call_tags
for key in gathered_context: for key in gathered_context:
if key.startswith("tag_") and key not in call_tags: if key.startswith("tag_") and key not in call_tags:
call_tags.append(gathered_context[key]) call_tags.append(gathered_context[key])
gathered_context["call_tags"] = call_tags gathered_context["call_tags"] = call_tags
# Clean up engine resources (including voicemail detector) # Clean up engine resources (including voicemail detector)
await engine.cleanup() await engine.cleanup()
@ -213,12 +209,9 @@ def register_event_handlers(
else: else:
logger.debug("Audio buffer is empty, skipping upload") logger.debug("Audio buffer is empty, skipping upload")
if not in_memory_transcript_buffer.is_empty: transcript_temp_path = in_memory_logs_buffer.write_transcript_to_temp_file()
transcript_temp_path = ( if not transcript_temp_path:
await in_memory_transcript_buffer.write_to_temp_file() logger.debug("No transcript events in logs buffer, skipping upload")
)
else:
logger.debug("Transcript buffer is empty, skipping upload")
except Exception as e: except Exception as e:
logger.error(f"Error preparing buffers for S3 upload: {e}", exc_info=True) logger.error(f"Error preparing buffers for S3 upload: {e}", exc_info=True)
@ -233,8 +226,8 @@ def register_event_handlers(
transcript_temp_path, transcript_temp_path,
) )
# Return the buffers so they can be passed to other handlers # Return the buffer so it can be passed to other handlers
return in_memory_audio_buffer, in_memory_transcript_buffer return in_memory_audio_buffer
def register_audio_data_handler( def register_audio_data_handler(
@ -256,28 +249,3 @@ def register_audio_data_handler(
except MemoryError as e: except MemoryError as e:
logger.error(f"Memory buffer full: {e}") logger.error(f"Memory buffer full: {e}")
# Could implement overflow to disk here if needed # Could implement overflow to disk here if needed
def register_transcript_handlers(
user_aggregator,
assistant_aggregator,
workflow_run_id,
in_memory_buffer: InMemoryTranscriptBuffer,
):
"""Register event handlers for transcript updates on context aggregators.
Uses the on_user_turn_stopped and on_assistant_turn_stopped events to capture
transcripts as turns complete, following the event-based pattern.
"""
@user_aggregator.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(aggregator, strategy, message):
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
line = f"{timestamp}user: {message.content}\n"
await in_memory_buffer.append(line)
@assistant_aggregator.event_handler("on_assistant_turn_stopped")
async def on_assistant_turn_stopped(aggregator, message):
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
line = f"{timestamp}assistant: {message.content}\n"
await in_memory_buffer.append(line)

View file

@ -1,12 +1,13 @@
import asyncio import asyncio
import re
import tempfile import tempfile
import wave import wave
from datetime import UTC, datetime from datetime import UTC, datetime
from typing import List from typing import List, Optional
from loguru import logger from loguru import logger
from pipecat.utils.enums import RealtimeFeedbackType
class InMemoryAudioBuffer: class InMemoryAudioBuffer:
"""Buffer audio data in memory during a call, then write to temp file on disconnect.""" """Buffer audio data in memory during a call, then write to temp file on disconnect."""
@ -69,60 +70,6 @@ class InMemoryAudioBuffer:
return self._total_size return self._total_size
class InMemoryTranscriptBuffer:
"""Buffer transcript data in memory during a call, then write to temp file on disconnect."""
# Compiled regex to identify user speech lines, e.g.
# [2025-06-29T12:34:56.789+00:00] user: hello
_USER_SPEECH_RE: re.Pattern[str] = re.compile(
r"^\[\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}\+\d{2}:\d{2}\] user: .+"
)
def __init__(self, workflow_run_id: int):
self._workflow_run_id = workflow_run_id
self._lines: List[str] = []
self._lock = asyncio.Lock()
async def append(self, transcript: str):
"""Append transcript text to the buffer."""
async with self._lock:
self._lines.append(transcript)
logger.trace(
f"Appended transcript line to buffer for workflow {self._workflow_run_id}"
)
async def write_to_temp_file(self) -> str:
"""Write transcript to a temporary text file and return the path."""
async with self._lock:
temp_file = tempfile.NamedTemporaryFile(
mode="w", suffix=".txt", delete=False
)
logger.debug(
f"Writing transcript buffer to temp file {temp_file.name} for workflow {self._workflow_run_id}"
)
content = "".join(self._lines)
temp_file.write(content)
temp_file.close()
logger.info(
f"Successfully wrote {len(content)} chars of transcript to {temp_file.name}"
)
return temp_file.name
@property
def is_empty(self) -> bool:
"""Check if the buffer is empty."""
return len(self._lines) == 0
def contains_user_speech(self) -> bool:
"""Return True if any buffered transcript line matches the user speech pattern."""
for line in self._lines:
if self._USER_SPEECH_RE.match(line):
return True
return False
class InMemoryLogsBuffer: class InMemoryLogsBuffer:
"""Buffer real-time feedback events in memory during a call, then save to workflow run logs.""" """Buffer real-time feedback events in memory during a call, then save to workflow run logs."""
@ -130,15 +77,36 @@ class InMemoryLogsBuffer:
self._workflow_run_id = workflow_run_id self._workflow_run_id = workflow_run_id
self._events: List[dict] = [] self._events: List[dict] = []
self._turn_counter = 0 self._turn_counter = 0
self._current_node_id: Optional[str] = None
self._current_node_name: Optional[str] = None
def set_current_node(self, node_id: str, node_name: str):
"""Set the current node ID and name to be injected into subsequent events."""
self._current_node_id = node_id
self._current_node_name = node_name
@property
def current_node_id(self) -> Optional[str]:
"""Get the current node ID."""
return self._current_node_id
@property
def current_node_name(self) -> Optional[str]:
"""Get the current node name."""
return self._current_node_name
async def append(self, event: dict): async def append(self, event: dict):
"""Append a feedback event to the buffer with timestamp.""" """Append a feedback event to the buffer with timestamp and current node."""
# Add timestamp and turn tracking # Add timestamp, turn tracking, and current node
timestamped_event = { timestamped_event = {
**event, **event,
"timestamp": datetime.now(UTC).isoformat(), "timestamp": datetime.now(UTC).isoformat(),
"turn": self._turn_counter, "turn": self._turn_counter,
} }
if self._current_node_id:
timestamped_event["node_id"] = self._current_node_id
if self._current_node_name:
timestamped_event["node_name"] = self._current_node_name
self._events.append(timestamped_event) self._events.append(timestamped_event)
logger.trace( logger.trace(
f"Appended event {event.get('type')} to logs buffer for workflow {self._workflow_run_id}" f"Appended event {event.get('type')} to logs buffer for workflow {self._workflow_run_id}"
@ -155,6 +123,63 @@ class InMemoryLogsBuffer:
"""Get all events for final storage.""" """Get all events for final storage."""
return self._events return self._events
def contains_user_speech(self) -> bool:
"""Return True if any final user transcription event has non-empty text."""
for event in self._events:
if (
event.get("type") == RealtimeFeedbackType.USER_TRANSCRIPTION.value
and event.get("payload", {}).get("final") is True
and event.get("payload", {}).get("text")
):
return True
return False
def generate_transcript_text(self) -> str:
"""Generate transcript text from logged events.
Filters for rtf-user-transcription (final) and rtf-bot-text events,
formats them as '[timestamp] user/assistant: text\n'.
"""
lines: List[str] = []
for event in self._events:
event_type = event.get("type")
payload = event.get("payload", {})
if (
event_type == RealtimeFeedbackType.USER_TRANSCRIPTION.value
and payload.get("final") is True
):
timestamp = payload.get("timestamp", "")
prefix = f"[{timestamp}] " if timestamp else ""
lines.append(f"{prefix}user: {payload.get('text', '')}\n")
elif event_type == RealtimeFeedbackType.BOT_TEXT.value:
timestamp = payload.get("timestamp", "")
prefix = f"[{timestamp}] " if timestamp else ""
lines.append(f"{prefix}assistant: {payload.get('text', '')}\n")
return "".join(lines)
def write_transcript_to_temp_file(self) -> Optional[str]:
"""Write transcript to a temporary text file and return the path.
Returns None if there are no transcript events.
"""
content = self.generate_transcript_text()
if not content:
return None
temp_file = tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False)
logger.debug(
f"Writing transcript to temp file {temp_file.name} for workflow {self._workflow_run_id}"
)
temp_file.write(content)
temp_file.close()
logger.info(
f"Successfully wrote {len(content)} chars of transcript to {temp_file.name}"
)
return temp_file.name
@property @property
def is_empty(self) -> bool: def is_empty(self) -> bool:
"""Check if the buffer is empty.""" """Check if the buffer is empty."""

View file

@ -8,6 +8,13 @@ For frames with presentation timestamps (pts), like TTSTextFrame, we respect
the timing by queuing them and sending at the appropriate time, similar to the timing by queuing them and sending at the appropriate time, similar to
how base_output.py handles timed frames. how base_output.py handles timed frames.
Streaming vs. persisted data:
- WebSocket receives all events in real-time (interim transcriptions, TTS text
chunks, function calls, metrics) for live UI feedback.
- The logs buffer only stores final complete transcripts per turn (via
register_turn_handlers hooking into aggregator events), function calls,
and metrics not interim/streaming data.
Note: Node transition events are sent directly from PipecatEngine.set_node() Note: Node transition events are sent directly from PipecatEngine.set_node()
rather than being observed here, to ensure precise timing at the moment of rather than being observed here, to ensure precise timing at the moment of
node changes. node changes.
@ -37,17 +44,23 @@ from pipecat.frames.frames import (
from pipecat.metrics.metrics import TTFBMetricsData from pipecat.metrics.metrics import TTFBMetricsData
from pipecat.observers.base_observer import BaseObserver, FramePushed from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.processors.frame_processor import FrameDirection from pipecat.processors.frame_processor import FrameDirection
from pipecat.utils.enums import RealtimeFeedbackType
from pipecat.utils.time import nanoseconds_to_seconds from pipecat.utils.time import nanoseconds_to_seconds
class RealtimeFeedbackObserver(BaseObserver): class RealtimeFeedbackObserver(BaseObserver):
"""Observer that sends real-time transcription, bot response, and metrics via WebSocket. """Observer that sends real-time events via WebSocket and persists final transcripts.
Observes pipeline frames and sends events for: WebSocket streaming (all events for live UI):
- User transcriptions (interim and final) - User transcriptions (interim and final)
- Bot TTS text (with pts-based timing) - Bot TTS text (with pts-based timing)
- Function calls (start/end) - Function calls (start/end)
- TTFB metrics (LLM generation time only - filters to processors containing "LLM") - TTFB metrics (LLM generation time only)
Logs buffer persistence (only final data for post-call analysis):
- Complete user transcripts per turn (via on_user_turn_stopped)
- Complete assistant transcripts per turn (via on_assistant_turn_stopped)
- Function calls and TTFB metrics
For frames with pts (presentation timestamp), we queue them and send at the For frames with pts (presentation timestamp), we queue them and send at the
appropriate time to sync with audio playback. appropriate time to sync with audio playback.
@ -134,8 +147,8 @@ class RealtimeFeedbackObserver(BaseObserver):
if target_time > current_time: if target_time > current_time:
await asyncio.sleep(target_time - current_time) await asyncio.sleep(target_time - current_time)
# Send the message # Send the message (clock queue only has TTS text, WS-only)
await self._send_message(message) await self._send_ws(message)
self._clock_queue.task_done() self._clock_queue.task_done()
except asyncio.CancelledError: except asyncio.CancelledError:
break break
@ -164,11 +177,11 @@ class RealtimeFeedbackObserver(BaseObserver):
return return
self._frames_seen.add(frame.id) self._frames_seen.add(frame.id)
# Handle user transcriptions (interim) # Handle user transcriptions (interim) - WebSocket only
if isinstance(frame, InterimTranscriptionFrame): if isinstance(frame, InterimTranscriptionFrame):
await self._send_message( await self._send_ws(
{ {
"type": "rtf-user-transcription", "type": RealtimeFeedbackType.USER_TRANSCRIPTION.value,
"payload": { "payload": {
"text": frame.text, "text": frame.text,
"final": False, "final": False,
@ -177,11 +190,12 @@ class RealtimeFeedbackObserver(BaseObserver):
}, },
} }
) )
# Handle user transcriptions (final) # Handle user transcriptions (final) - WebSocket only
# Complete turn text is persisted via register_turn_handlers
elif isinstance(frame, TranscriptionFrame): elif isinstance(frame, TranscriptionFrame):
await self._send_message( await self._send_ws(
{ {
"type": "rtf-user-transcription", "type": RealtimeFeedbackType.USER_TRANSCRIPTION.value,
"payload": { "payload": {
"text": frame.text, "text": frame.text,
"final": True, "final": True,
@ -190,13 +204,11 @@ class RealtimeFeedbackObserver(BaseObserver):
}, },
} }
) )
# Increment turn counter on final user transcription # Handle bot TTS text - respect pts timing, WebSocket only
if self._logs_buffer: # Complete turn text is persisted via register_turn_handlers
self._logs_buffer.increment_turn()
# Handle bot TTS text - respect pts timing
elif isinstance(frame, TTSTextFrame): elif isinstance(frame, TTSTextFrame):
message = { message = {
"type": "rtf-bot-text", "type": RealtimeFeedbackType.BOT_TEXT.value,
"payload": { "payload": {
"text": frame.text, "text": frame.text,
}, },
@ -213,7 +225,7 @@ class RealtimeFeedbackObserver(BaseObserver):
await self._clock_queue.put((frame.pts, frame.id, message)) await self._clock_queue.put((frame.pts, frame.id, message))
else: else:
# No pts, send immediately # No pts, send immediately
await self._send_message(message) await self._send_ws(message)
# Handle function call in progress # Handle function call in progress
elif ( elif (
isinstance(frame, FunctionCallInProgressFrame) isinstance(frame, FunctionCallInProgressFrame)
@ -221,7 +233,7 @@ class RealtimeFeedbackObserver(BaseObserver):
): ):
await self._send_message( await self._send_message(
{ {
"type": "rtf-function-call-start", "type": RealtimeFeedbackType.FUNCTION_CALL_START.value,
"payload": { "payload": {
"function_name": frame.function_name, "function_name": frame.function_name,
"tool_call_id": frame.tool_call_id, "tool_call_id": frame.tool_call_id,
@ -235,7 +247,7 @@ class RealtimeFeedbackObserver(BaseObserver):
): ):
await self._send_message( await self._send_message(
{ {
"type": "rtf-function-call-end", "type": RealtimeFeedbackType.FUNCTION_CALL_END.value,
"payload": { "payload": {
"function_name": frame.function_name, "function_name": frame.function_name,
"tool_call_id": frame.tool_call_id, "tool_call_id": frame.tool_call_id,
@ -252,7 +264,7 @@ class RealtimeFeedbackObserver(BaseObserver):
if metric_data.processor and "LLM" in metric_data.processor: if metric_data.processor and "LLM" in metric_data.processor:
await self._send_message( await self._send_message(
{ {
"type": "rtf-ttfb-metric", "type": RealtimeFeedbackType.TTFB_METRIC.value,
"payload": { "payload": {
"ttfb_seconds": metric_data.value, "ttfb_seconds": metric_data.value,
"processor": metric_data.processor, "processor": metric_data.processor,
@ -261,18 +273,77 @@ class RealtimeFeedbackObserver(BaseObserver):
} }
) )
async def _send_message(self, message: dict): async def _send_ws(self, message: dict):
"""Send message via WebSocket AND append to logs buffer, handling errors gracefully.""" """Send message via WebSocket only, handling errors gracefully."""
# Send via WebSocket if not self._ws_sender:
return
try: try:
# Inject current node info from the logs buffer
if self._logs_buffer and self._logs_buffer.current_node_id:
message = {
**message,
"node_id": self._logs_buffer.current_node_id,
"node_name": self._logs_buffer.current_node_name,
}
await self._ws_sender(message) await self._ws_sender(message)
except Exception as e: except Exception as e:
# Log but don't fail - feedback is non-critical
logger.debug(f"Failed to send real-time feedback message: {e}") logger.debug(f"Failed to send real-time feedback message: {e}")
# Also append to logs buffer async def _send_message(self, message: dict):
"""Send message via WebSocket AND append to logs buffer."""
await self._send_ws(message)
await self._append_to_buffer(message)
async def _append_to_buffer(self, message: dict):
"""Append message to logs buffer, handling errors gracefully."""
if self._logs_buffer: if self._logs_buffer:
try: try:
await self._logs_buffer.append(message) await self._logs_buffer.append(message)
except Exception as e: except Exception as e:
logger.error(f"Failed to append to logs buffer: {e}") logger.error(f"Failed to append to logs buffer: {e}")
def register_turn_log_handlers(
logs_buffer: "InMemoryLogsBuffer",
user_aggregator,
assistant_aggregator,
):
"""Register event handlers on aggregators to persist final turn transcripts.
Hooks into on_user_turn_stopped and on_assistant_turn_stopped to store
complete turn text in the logs buffer. Works for both WebRTC and telephony
calls independent of WebSocket availability.
"""
@user_aggregator.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(aggregator, strategy, message):
logs_buffer.increment_turn()
try:
await logs_buffer.append(
{
"type": RealtimeFeedbackType.USER_TRANSCRIPTION.value,
"payload": {
"text": message.content,
"final": True,
"timestamp": message.timestamp,
},
}
)
except Exception as e:
logger.error(f"Failed to append user turn to logs buffer: {e}")
@assistant_aggregator.event_handler("on_assistant_turn_stopped")
async def on_assistant_turn_stopped(aggregator, message):
if message.content:
try:
await logs_buffer.append(
{
"type": RealtimeFeedbackType.BOT_TEXT.value,
"payload": {
"text": message.content,
"timestamp": message.timestamp,
},
}
)
except Exception as e:
logger.error(f"Failed to append assistant turn to logs buffer: {e}")

View file

@ -12,7 +12,6 @@ from api.services.pipecat.audio_config import AudioConfig, create_audio_config
from api.services.pipecat.event_handlers import ( from api.services.pipecat.event_handlers import (
register_audio_data_handler, register_audio_data_handler,
register_event_handlers, register_event_handlers,
register_transcript_handlers,
) )
from api.services.pipecat.in_memory_buffers import InMemoryLogsBuffer from api.services.pipecat.in_memory_buffers import InMemoryLogsBuffer
from api.services.pipecat.pipeline_builder import ( from api.services.pipecat.pipeline_builder import (
@ -24,7 +23,10 @@ from api.services.pipecat.pipeline_engine_callbacks_processor import (
PipelineEngineCallbacksProcessor, PipelineEngineCallbacksProcessor,
) )
from api.services.pipecat.pipeline_metrics_aggregator import PipelineMetricsAggregator from api.services.pipecat.pipeline_metrics_aggregator import PipelineMetricsAggregator
from api.services.pipecat.realtime_feedback_observer import RealtimeFeedbackObserver from api.services.pipecat.realtime_feedback_observer import (
RealtimeFeedbackObserver,
register_turn_log_handlers,
)
from api.services.pipecat.service_factory import ( from api.services.pipecat.service_factory import (
create_llm_service, create_llm_service,
create_stt_service, create_stt_service,
@ -73,7 +75,7 @@ from pipecat.turns.user_stop import (
TurnAnalyzerUserTurnStopStrategy, TurnAnalyzerUserTurnStopStrategy,
) )
from pipecat.turns.user_turn_strategies import UserTurnStrategies from pipecat.turns.user_turn_strategies import UserTurnStrategies
from pipecat.utils.enums import EndTaskReason from pipecat.utils.enums import EndTaskReason, RealtimeFeedbackType
from pipecat.utils.run_context import set_current_run_id from pipecat.utils.run_context import set_current_run_id
from pipecat.utils.tracing.context_registry import ContextProviderRegistry from pipecat.utils.tracing.context_registry import ContextProviderRegistry
@ -511,35 +513,42 @@ async def _run_pipeline(
# Create in-memory logs buffer early so it can be used by engine callbacks # Create in-memory logs buffer early so it can be used by engine callbacks
in_memory_logs_buffer = InMemoryLogsBuffer(workflow_run_id) in_memory_logs_buffer = InMemoryLogsBuffer(workflow_run_id)
# Create node transition callback if WebSocket sender is available # Create node transition callback (always logs to buffer, optionally streams to WS)
node_transition_callback = None
ws_sender = get_ws_sender(workflow_run_id) ws_sender = get_ws_sender(workflow_run_id)
if ws_sender:
async def send_node_transition( async def send_node_transition(
node_name: str, previous_node: Optional[str] node_id: str,
) -> None: node_name: str,
"""Send node transition event via WebSocket AND log to buffer.""" previous_node_id: Optional[str],
message = { previous_node_name: Optional[str],
"type": "rtf-node-transition", ) -> None:
"payload": { """Send node transition event to logs buffer and optionally via WebSocket."""
"node_name": node_name, # Update current node on the buffer so subsequent events are tagged
"previous_node": previous_node, in_memory_logs_buffer.set_current_node(node_id, node_name)
},
} message = {
# Send via WebSocket "type": RealtimeFeedbackType.NODE_TRANSITION.value,
"payload": {
"node_id": node_id,
"node_name": node_name,
"previous_node_id": previous_node_id,
"previous_node_name": previous_node_name,
},
}
# Send via WebSocket if available
if ws_sender:
try: try:
await ws_sender(message) await ws_sender({**message, "node_id": node_id, "node_name": node_name})
except Exception as e: except Exception as e:
logger.debug(f"Failed to send node transition via WebSocket: {e}") logger.debug(f"Failed to send node transition via WebSocket: {e}")
# Log to in-memory buffer # Always log to in-memory buffer (node_id/node_name injected by buffer's append)
try: try:
await in_memory_logs_buffer.append(message) await in_memory_logs_buffer.append(message)
except Exception as e: except Exception as e:
logger.error(f"Failed to append node transition to logs buffer: {e}") logger.error(f"Failed to append node transition to logs buffer: {e}")
node_transition_callback = send_node_transition node_transition_callback = send_node_transition
# Extract embeddings configuration from user config # Extract embeddings configuration from user config
embeddings_api_key = None embeddings_api_key = None
@ -694,17 +703,48 @@ async def _run_pipeline(
# Initialize the engine to set the initial context # Initialize the engine to set the initial context
await engine.initialize() await engine.initialize()
# Add real-time feedback observer if WebSocket sender is available # Add real-time feedback observer (always logs to buffer, streams to WS if available)
# Note: ws_sender was already fetched earlier for node_transition_callback feedback_observer = RealtimeFeedbackObserver(
if ws_sender: ws_sender=ws_sender,
feedback_observer = RealtimeFeedbackObserver( logs_buffer=in_memory_logs_buffer,
ws_sender=ws_sender, )
logs_buffer=in_memory_logs_buffer, task.add_observer(feedback_observer)
)
task.add_observer(feedback_observer) # Register latency observer to log user-to-bot response latency
if task.user_bot_latency_observer:
@task.user_bot_latency_observer.event_handler("on_latency_measured")
async def on_latency_measured(observer, latency_seconds):
message = {
"type": RealtimeFeedbackType.LATENCY_MEASURED.value,
"payload": {
"latency_seconds": latency_seconds,
},
}
if ws_sender:
try:
ws_message = message
if in_memory_logs_buffer.current_node_id:
ws_message = {
**message,
"node_id": in_memory_logs_buffer.current_node_id,
"node_name": in_memory_logs_buffer.current_node_name,
}
await ws_sender(ws_message)
except Exception as e:
logger.debug(f"Failed to send latency via WebSocket: {e}")
try:
await in_memory_logs_buffer.append(message)
except Exception as e:
logger.error(f"Failed to append latency to logs buffer: {e}")
# Register turn log handlers for all call types (WebRTC and telephony)
register_turn_log_handlers(
in_memory_logs_buffer, user_context_aggregator, assistant_context_aggregator
)
# Register event handlers # Register event handlers
in_memory_audio_buffer, in_memory_transcript_buffer = register_event_handlers( in_memory_audio_buffer = register_event_handlers(
task, task,
transport, transport,
workflow_run_id, workflow_run_id,
@ -716,12 +756,6 @@ async def _run_pipeline(
) )
register_audio_data_handler(audio_buffer, workflow_run_id, in_memory_audio_buffer) register_audio_data_handler(audio_buffer, workflow_run_id, in_memory_audio_buffer)
register_transcript_handlers(
user_context_aggregator,
assistant_context_aggregator,
workflow_run_id,
in_memory_transcript_buffer,
)
try: try:
# Run the pipeline # Run the pipeline

View file

@ -63,7 +63,7 @@ class PipecatEngine:
call_context_vars: dict, call_context_vars: dict,
workflow_run_id: Optional[int] = None, workflow_run_id: Optional[int] = None,
node_transition_callback: Optional[ node_transition_callback: Optional[
Callable[[str, Optional[str]], Awaitable[None]] Callable[[str, str, Optional[str], Optional[str]], Awaitable[None]]
] = None, ] = None,
embeddings_api_key: Optional[str] = None, embeddings_api_key: Optional[str] = None,
embeddings_model: Optional[str] = None, embeddings_model: Optional[str] = None,
@ -456,14 +456,22 @@ class PipecatEngine:
# Track previous node for transition event # Track previous node for transition event
previous_node_name = self._current_node.name if self._current_node else None previous_node_name = self._current_node.name if self._current_node else None
previous_node_id = self._current_node.id if self._current_node else None
# Set current node for all nodes (including static ones) so STT mute filter works # Set current node for all nodes (including static ones) so STT mute filter works
self._current_node = node self._current_node = node
# Track visited nodes in gathered context for call tags
nodes_visited = self._gathered_context.setdefault("nodes_visited", [])
if node.name not in nodes_visited:
nodes_visited.append(node.name)
# Send node transition event if callback is provided # Send node transition event if callback is provided
if self._node_transition_callback: if self._node_transition_callback:
try: try:
await self._node_transition_callback(node.name, previous_node_name) await self._node_transition_callback(
node_id, node.name, previous_node_id, previous_node_name
)
except Exception as e: except Exception as e:
# Log but don't fail - feedback is non-critical # Log but don't fail - feedback is non-critical
logger.debug(f"Failed to send node transition event: {e}") logger.debug(f"Failed to send node transition event: {e}")

View file

@ -231,6 +231,20 @@ class CustomToolManager:
message_type = config.get("messageType", "none") message_type = config.get("messageType", "none")
custom_message = config.get("customMessage", "") custom_message = config.get("customMessage", "")
# Handle end call reason if enabled
end_call_reason_enabled = config.get("endCallReason", False)
if end_call_reason_enabled:
reason = (
function_call_params.arguments.get("reason", "")
or "end_call_tool"
)
logger.info(f"End call reason: {reason}")
self._engine._gathered_context["call_disposition"] = reason
call_tags = self._engine._gathered_context.get("call_tags", [])
if reason not in call_tags:
call_tags.extend([reason, "end_call_tool"])
self._engine._gathered_context["call_tags"] = call_tags
# Send result callback first # Send result callback first
await function_call_params.result_callback( await function_call_params.result_callback(
{"status": "success", "action": "ending_call"}, {"status": "success", "action": "ending_call"},

View file

@ -51,6 +51,19 @@ def tool_to_function_schema(tool: Any) -> Dict[str, Any]:
if param_required: if param_required:
required.append(param_name) required.append(param_name)
# If this is an end_call tool with endCallReason enabled, add a required 'reason' parameter
if definition.get("type") == "end_call" and config.get("endCallReason", False):
default_description = (
"The reason for ending the call (e.g., 'voicemail_detected', "
"'issue_resolved', 'customer_requested')"
)
properties["reason"] = {
"type": "string",
"description": config.get("endCallReasonDescription")
or default_description,
}
required.append("reason")
# Sanitize tool name for function name (lowercase, underscores only) # Sanitize tool name for function name (lowercase, underscores only)
function_name = re.sub(r"[^a-z0-9_]", "_", tool.name.lower()) function_name = re.sub(r"[^a-z0-9_]", "_", tool.name.lower())
# Remove consecutive underscores and trim # Remove consecutive underscores and trim

@ -1 +1 @@
Subproject commit fbc9a768445e8f683721744659fc8904d4012081 Subproject commit 6aa0502a9834d536aba9589cec87d827e66f2fad

View file

@ -4,6 +4,7 @@ import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/com
import { Input } from "@/components/ui/input"; import { Input } from "@/components/ui/input";
import { Label } from "@/components/ui/label"; import { Label } from "@/components/ui/label";
import { RadioGroup, RadioGroupItem } from "@/components/ui/radio-group"; import { RadioGroup, RadioGroupItem } from "@/components/ui/radio-group";
import { Switch } from "@/components/ui/switch";
import { Textarea } from "@/components/ui/textarea"; import { Textarea } from "@/components/ui/textarea";
import { type EndCallMessageType } from "../../config"; import { type EndCallMessageType } from "../../config";
@ -17,6 +18,10 @@ export interface EndCallToolConfigProps {
onMessageTypeChange: (messageType: EndCallMessageType) => void; onMessageTypeChange: (messageType: EndCallMessageType) => void;
customMessage: string; customMessage: string;
onCustomMessageChange: (message: string) => void; onCustomMessageChange: (message: string) => void;
endCallReason: boolean;
onEndCallReasonChange: (enabled: boolean) => void;
endCallReasonDescription: string;
onEndCallReasonDescriptionChange: (description: string) => void;
} }
export function EndCallToolConfig({ export function EndCallToolConfig({
@ -28,6 +33,10 @@ export function EndCallToolConfig({
onMessageTypeChange, onMessageTypeChange,
customMessage, customMessage,
onCustomMessageChange, onCustomMessageChange,
endCallReason,
onEndCallReasonChange,
endCallReasonDescription,
onEndCallReasonDescriptionChange,
}: EndCallToolConfigProps) { }: EndCallToolConfigProps) {
return ( return (
<Card> <Card>
@ -63,6 +72,35 @@ export function EndCallToolConfig({
/> />
</div> </div>
<div className="grid gap-2 pt-4 border-t">
<div className="flex items-center space-x-2">
<Switch
id="end-call-reason"
checked={endCallReason}
onCheckedChange={onEndCallReasonChange}
/>
<Label htmlFor="end-call-reason">Capture End Call Reason</Label>
</div>
<Label className="text-xs text-muted-foreground">
When enabled, the AI will provide a reason for ending the call.
The reason will be set as the call disposition and added to call tags for analytics.
</Label>
{endCallReason && (
<div className="grid gap-2 pt-2">
<Label>Reason Description</Label>
<Label className="text-xs text-muted-foreground">
Instructions shown to the AI for what kind of reason to provide
</Label>
<Textarea
value={endCallReasonDescription}
onChange={(e) => onEndCallReasonDescriptionChange(e.target.value)}
placeholder="e.g., The reason for ending the call (e.g., 'voicemail_detected', 'issue_resolved', 'customer_requested')"
rows={2}
/>
</div>
)}
</div>
<div className="grid gap-4 pt-4 border-t"> <div className="grid gap-4 pt-4 border-t">
<Label>Goodbye Message</Label> <Label>Goodbye Message</Label>
<Label className="text-xs text-muted-foreground"> <Label className="text-xs text-muted-foreground">

View file

@ -9,6 +9,7 @@ import {
updateToolApiV1ToolsToolUuidPut, updateToolApiV1ToolsToolUuidPut,
} from "@/client/sdk.gen"; } from "@/client/sdk.gen";
import type { ToolResponse, TransferCallConfig as APITransferCallConfig } from "@/client/types.gen"; import type { ToolResponse, TransferCallConfig as APITransferCallConfig } from "@/client/types.gen";
import type { EndCallConfig } from "@/client/types.gen";
import { type HttpMethod, type KeyValueItem, type ToolParameter, validateUrl } from "@/components/http"; import { type HttpMethod, type KeyValueItem, type ToolParameter, validateUrl } from "@/components/http";
import { Button } from "@/components/ui/button"; import { Button } from "@/components/ui/button";
import { import {
@ -22,7 +23,7 @@ import { Skeleton } from "@/components/ui/skeleton";
import { useAuth } from "@/lib/auth"; import { useAuth } from "@/lib/auth";
import { import {
type EndCallConfig, DEFAULT_END_CALL_REASON_DESCRIPTION,
type EndCallMessageType, type EndCallMessageType,
getCategoryConfig, getCategoryConfig,
getToolTypeLabel, getToolTypeLabel,
@ -68,6 +69,15 @@ export default function ToolDetailPage() {
// End Call form state // End Call form state
const [endCallMessageType, setEndCallMessageType] = useState<EndCallMessageType>("none"); const [endCallMessageType, setEndCallMessageType] = useState<EndCallMessageType>("none");
const [endCallCustomMessage, setEndCallCustomMessage] = useState(""); const [endCallCustomMessage, setEndCallCustomMessage] = useState("");
const [endCallReason, setEndCallReason] = useState(false);
const [endCallReasonDescription, setEndCallReasonDescription] = useState("");
const handleEndCallReasonChange = (enabled: boolean) => {
setEndCallReason(enabled);
if (enabled && !endCallReasonDescription) {
setEndCallReasonDescription(DEFAULT_END_CALL_REASON_DESCRIPTION);
}
};
// Transfer Call form state // Transfer Call form state
const [transferDestination, setTransferDestination] = useState(""); const [transferDestination, setTransferDestination] = useState("");
@ -119,9 +129,13 @@ export default function ToolDetailPage() {
if (config) { if (config) {
setEndCallMessageType(config.messageType || "none"); setEndCallMessageType(config.messageType || "none");
setEndCallCustomMessage(config.customMessage || ""); setEndCallCustomMessage(config.customMessage || "");
setEndCallReason(config.endCallReason ?? false);
setEndCallReasonDescription(config.endCallReasonDescription || "");
} else { } else {
setEndCallMessageType("none"); setEndCallMessageType("none");
setEndCallCustomMessage(""); setEndCallCustomMessage("");
setEndCallReason(false);
setEndCallReasonDescription("");
} }
} else if (tool.category === "transfer_call") { } else if (tool.category === "transfer_call") {
// Populate transfer call specific fields // Populate transfer call specific fields
@ -225,6 +239,8 @@ export default function ToolDetailPage() {
config: { config: {
messageType: endCallMessageType, messageType: endCallMessageType,
customMessage: endCallMessageType === "custom" ? endCallCustomMessage : undefined, customMessage: endCallMessageType === "custom" ? endCallCustomMessage : undefined,
endCallReason,
endCallReasonDescription: endCallReason ? endCallReasonDescription || undefined : undefined,
}, },
}, },
}; };
@ -432,6 +448,10 @@ const data = await response.json();`;
onMessageTypeChange={setEndCallMessageType} onMessageTypeChange={setEndCallMessageType}
customMessage={endCallCustomMessage} customMessage={endCallCustomMessage}
onCustomMessageChange={setEndCallCustomMessage} onCustomMessageChange={setEndCallCustomMessage}
endCallReason={endCallReason}
onEndCallReasonChange={handleEndCallReasonChange}
endCallReasonDescription={endCallReasonDescription}
onEndCallReasonDescriptionChange={setEndCallReasonDescription}
/> />
) : isTransferCallTool ? ( ) : isTransferCallTool ? (
<TransferCallToolConfig <TransferCallToolConfig

View file

@ -3,6 +3,8 @@
import { Cog, Globe, type LucideIcon, PhoneForwarded, PhoneOff, Puzzle } from "lucide-react"; import { Cog, Globe, type LucideIcon, PhoneForwarded, PhoneOff, Puzzle } from "lucide-react";
import { type ReactNode } from "react"; import { type ReactNode } from "react";
import type { EndCallConfig } from "@/client/types.gen";
export type ToolCategory = "http_api" | "end_call" | "transfer_call" | "native" | "integration"; export type ToolCategory = "http_api" | "end_call" | "transfer_call" | "native" | "integration";
export type EndCallMessageType = "none" | "custom"; export type EndCallMessageType = "none" | "custom";
@ -110,15 +112,13 @@ export function getToolTypeLabel(category: string): string {
} }
} }
// End Call tool specific configuration export const DEFAULT_END_CALL_REASON_DESCRIPTION =
export interface EndCallConfig { "The reason for ending the call (e.g., 'voicemail_detected', 'issue_resolved', 'customer_requested')";
messageType: EndCallMessageType;
customMessage?: string;
}
export const DEFAULT_END_CALL_CONFIG: EndCallConfig = { export const DEFAULT_END_CALL_CONFIG: EndCallConfig = {
messageType: "none", messageType: "none",
customMessage: "", customMessage: "",
endCallReason: false,
}; };
// Transfer Call tool specific configuration // Transfer Call tool specific configuration

View file

@ -569,6 +569,14 @@ export type EndCallConfig = {
* Custom message to play before ending the call * Custom message to play before ending the call
*/ */
customMessage?: string | null; customMessage?: string | null;
/**
* When enabled, LLM must provide a reason for ending the call. The reason is set as call disposition and added to call tags.
*/
endCallReason?: boolean;
/**
* Description shown to the LLM for the reason parameter. Used only when endCallReason is enabled.
*/
endCallReasonDescription?: string | null;
}; };
/** /**