diff --git a/api/routes/tool.py b/api/routes/tool.py index 6430b1a..d54589e 100644 --- a/api/routes/tool.py +++ b/api/routes/tool.py @@ -55,6 +55,16 @@ class EndCallConfig(BaseModel): customMessage: Optional[str] = Field( default=None, description="Custom message to play before ending the call" ) + endCallReason: bool = Field( + default=False, + description="When enabled, LLM must provide a reason for ending the call. " + "The reason is set as call disposition and added to call tags.", + ) + endCallReasonDescription: Optional[str] = Field( + default=None, + description="Description shown to the LLM for the reason parameter. " + "Used only when endCallReason is enabled.", + ) class TransferCallConfig(BaseModel): diff --git a/api/services/pipecat/event_handlers.py b/api/services/pipecat/event_handlers.py index b8d4111..9fc334b 100644 --- a/api/services/pipecat/event_handlers.py +++ b/api/services/pipecat/event_handlers.py @@ -7,7 +7,6 @@ from api.services.pipecat.audio_config import AudioConfig from api.services.pipecat.in_memory_buffers import ( InMemoryAudioBuffer, InMemoryLogsBuffer, - InMemoryTranscriptBuffer, ) from api.services.pipecat.pipeline_metrics_aggregator import PipelineMetricsAggregator from api.services.workflow.pipecat_engine import PipecatEngine @@ -32,7 +31,7 @@ def register_event_handlers( """Register all event handlers for transport and task events. Returns: - Tuple of (in_memory_audio_buffer, in_memory_transcript_buffer) for use by other handlers. + in_memory_audio_buffer for use by other handlers. """ # Initialize in-memory buffers with proper audio configuration sample_rate = audio_config.pipeline_sample_rate if audio_config else 16000 @@ -48,8 +47,6 @@ def register_event_handlers( sample_rate=sample_rate, num_channels=num_channels, ) - in_memory_transcript_buffer = InMemoryTranscriptBuffer(workflow_run_id) - # Track both events to ensure LLM is only triggered after both occur ready_state = { "pipeline_started": False, @@ -123,23 +120,22 @@ def register_event_handlers( gathered_context = {**gathered_context, **workflow_run.gathered_context} # Set user_speech call tag - if in_memory_transcript_buffer: - call_tags = gathered_context.get("call_tags", []) + call_tags = gathered_context.get("call_tags", []) - try: - has_user_speech = in_memory_transcript_buffer.contains_user_speech() - except Exception: - has_user_speech = False + try: + has_user_speech = in_memory_logs_buffer.contains_user_speech() + except Exception: + has_user_speech = False - if has_user_speech and "user_speech" not in call_tags: - call_tags.append("user_speech") + if has_user_speech and "user_speech" not in call_tags: + call_tags.append("user_speech") - # Append any keys from gathered_context that start with 'tag_' to call_tags - for key in gathered_context: - if key.startswith("tag_") and key not in call_tags: - call_tags.append(gathered_context[key]) + # Append any keys from gathered_context that start with 'tag_' to call_tags + for key in gathered_context: + if key.startswith("tag_") and key not in call_tags: + call_tags.append(gathered_context[key]) - gathered_context["call_tags"] = call_tags + gathered_context["call_tags"] = call_tags # Clean up engine resources (including voicemail detector) await engine.cleanup() @@ -213,12 +209,9 @@ def register_event_handlers( else: logger.debug("Audio buffer is empty, skipping upload") - if not in_memory_transcript_buffer.is_empty: - transcript_temp_path = ( - await in_memory_transcript_buffer.write_to_temp_file() - ) - else: - logger.debug("Transcript buffer is empty, skipping upload") + transcript_temp_path = in_memory_logs_buffer.write_transcript_to_temp_file() + if not transcript_temp_path: + logger.debug("No transcript events in logs buffer, skipping upload") except Exception as e: logger.error(f"Error preparing buffers for S3 upload: {e}", exc_info=True) @@ -233,8 +226,8 @@ def register_event_handlers( transcript_temp_path, ) - # Return the buffers so they can be passed to other handlers - return in_memory_audio_buffer, in_memory_transcript_buffer + # Return the buffer so it can be passed to other handlers + return in_memory_audio_buffer def register_audio_data_handler( @@ -256,28 +249,3 @@ def register_audio_data_handler( except MemoryError as e: logger.error(f"Memory buffer full: {e}") # Could implement overflow to disk here if needed - - -def register_transcript_handlers( - user_aggregator, - assistant_aggregator, - workflow_run_id, - in_memory_buffer: InMemoryTranscriptBuffer, -): - """Register event handlers for transcript updates on context aggregators. - - Uses the on_user_turn_stopped and on_assistant_turn_stopped events to capture - transcripts as turns complete, following the event-based pattern. - """ - - @user_aggregator.event_handler("on_user_turn_stopped") - async def on_user_turn_stopped(aggregator, strategy, message): - timestamp = f"[{message.timestamp}] " if message.timestamp else "" - line = f"{timestamp}user: {message.content}\n" - await in_memory_buffer.append(line) - - @assistant_aggregator.event_handler("on_assistant_turn_stopped") - async def on_assistant_turn_stopped(aggregator, message): - timestamp = f"[{message.timestamp}] " if message.timestamp else "" - line = f"{timestamp}assistant: {message.content}\n" - await in_memory_buffer.append(line) diff --git a/api/services/pipecat/in_memory_buffers.py b/api/services/pipecat/in_memory_buffers.py index c4274f4..833e7aa 100644 --- a/api/services/pipecat/in_memory_buffers.py +++ b/api/services/pipecat/in_memory_buffers.py @@ -1,12 +1,13 @@ import asyncio -import re import tempfile import wave from datetime import UTC, datetime -from typing import List +from typing import List, Optional from loguru import logger +from pipecat.utils.enums import RealtimeFeedbackType + class InMemoryAudioBuffer: """Buffer audio data in memory during a call, then write to temp file on disconnect.""" @@ -69,60 +70,6 @@ class InMemoryAudioBuffer: return self._total_size -class InMemoryTranscriptBuffer: - """Buffer transcript data in memory during a call, then write to temp file on disconnect.""" - - # Compiled regex to identify user speech lines, e.g. - # [2025-06-29T12:34:56.789+00:00] user: hello - _USER_SPEECH_RE: re.Pattern[str] = re.compile( - r"^\[\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}\+\d{2}:\d{2}\] user: .+" - ) - - def __init__(self, workflow_run_id: int): - self._workflow_run_id = workflow_run_id - self._lines: List[str] = [] - self._lock = asyncio.Lock() - - async def append(self, transcript: str): - """Append transcript text to the buffer.""" - async with self._lock: - self._lines.append(transcript) - logger.trace( - f"Appended transcript line to buffer for workflow {self._workflow_run_id}" - ) - - async def write_to_temp_file(self) -> str: - """Write transcript to a temporary text file and return the path.""" - async with self._lock: - temp_file = tempfile.NamedTemporaryFile( - mode="w", suffix=".txt", delete=False - ) - logger.debug( - f"Writing transcript buffer to temp file {temp_file.name} for workflow {self._workflow_run_id}" - ) - - content = "".join(self._lines) - temp_file.write(content) - temp_file.close() - - logger.info( - f"Successfully wrote {len(content)} chars of transcript to {temp_file.name}" - ) - return temp_file.name - - @property - def is_empty(self) -> bool: - """Check if the buffer is empty.""" - return len(self._lines) == 0 - - def contains_user_speech(self) -> bool: - """Return True if any buffered transcript line matches the user speech pattern.""" - for line in self._lines: - if self._USER_SPEECH_RE.match(line): - return True - return False - - class InMemoryLogsBuffer: """Buffer real-time feedback events in memory during a call, then save to workflow run logs.""" @@ -130,15 +77,36 @@ class InMemoryLogsBuffer: self._workflow_run_id = workflow_run_id self._events: List[dict] = [] self._turn_counter = 0 + self._current_node_id: Optional[str] = None + self._current_node_name: Optional[str] = None + + def set_current_node(self, node_id: str, node_name: str): + """Set the current node ID and name to be injected into subsequent events.""" + self._current_node_id = node_id + self._current_node_name = node_name + + @property + def current_node_id(self) -> Optional[str]: + """Get the current node ID.""" + return self._current_node_id + + @property + def current_node_name(self) -> Optional[str]: + """Get the current node name.""" + return self._current_node_name async def append(self, event: dict): - """Append a feedback event to the buffer with timestamp.""" - # Add timestamp and turn tracking + """Append a feedback event to the buffer with timestamp and current node.""" + # Add timestamp, turn tracking, and current node timestamped_event = { **event, "timestamp": datetime.now(UTC).isoformat(), "turn": self._turn_counter, } + if self._current_node_id: + timestamped_event["node_id"] = self._current_node_id + if self._current_node_name: + timestamped_event["node_name"] = self._current_node_name self._events.append(timestamped_event) logger.trace( f"Appended event {event.get('type')} to logs buffer for workflow {self._workflow_run_id}" @@ -155,6 +123,63 @@ class InMemoryLogsBuffer: """Get all events for final storage.""" return self._events + def contains_user_speech(self) -> bool: + """Return True if any final user transcription event has non-empty text.""" + for event in self._events: + if ( + event.get("type") == RealtimeFeedbackType.USER_TRANSCRIPTION.value + and event.get("payload", {}).get("final") is True + and event.get("payload", {}).get("text") + ): + return True + return False + + def generate_transcript_text(self) -> str: + """Generate transcript text from logged events. + + Filters for rtf-user-transcription (final) and rtf-bot-text events, + formats them as '[timestamp] user/assistant: text\n'. + """ + lines: List[str] = [] + for event in self._events: + event_type = event.get("type") + payload = event.get("payload", {}) + + if ( + event_type == RealtimeFeedbackType.USER_TRANSCRIPTION.value + and payload.get("final") is True + ): + timestamp = payload.get("timestamp", "") + prefix = f"[{timestamp}] " if timestamp else "" + lines.append(f"{prefix}user: {payload.get('text', '')}\n") + elif event_type == RealtimeFeedbackType.BOT_TEXT.value: + timestamp = payload.get("timestamp", "") + prefix = f"[{timestamp}] " if timestamp else "" + lines.append(f"{prefix}assistant: {payload.get('text', '')}\n") + + return "".join(lines) + + def write_transcript_to_temp_file(self) -> Optional[str]: + """Write transcript to a temporary text file and return the path. + + Returns None if there are no transcript events. + """ + content = self.generate_transcript_text() + if not content: + return None + + temp_file = tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) + logger.debug( + f"Writing transcript to temp file {temp_file.name} for workflow {self._workflow_run_id}" + ) + temp_file.write(content) + temp_file.close() + + logger.info( + f"Successfully wrote {len(content)} chars of transcript to {temp_file.name}" + ) + return temp_file.name + @property def is_empty(self) -> bool: """Check if the buffer is empty.""" diff --git a/api/services/pipecat/realtime_feedback_observer.py b/api/services/pipecat/realtime_feedback_observer.py index 94ce896..bde6c6a 100644 --- a/api/services/pipecat/realtime_feedback_observer.py +++ b/api/services/pipecat/realtime_feedback_observer.py @@ -8,6 +8,13 @@ For frames with presentation timestamps (pts), like TTSTextFrame, we respect the timing by queuing them and sending at the appropriate time, similar to how base_output.py handles timed frames. +Streaming vs. persisted data: +- WebSocket receives all events in real-time (interim transcriptions, TTS text + chunks, function calls, metrics) for live UI feedback. +- The logs buffer only stores final complete transcripts per turn (via + register_turn_handlers hooking into aggregator events), function calls, + and metrics — not interim/streaming data. + Note: Node transition events are sent directly from PipecatEngine.set_node() rather than being observed here, to ensure precise timing at the moment of node changes. @@ -37,17 +44,23 @@ from pipecat.frames.frames import ( from pipecat.metrics.metrics import TTFBMetricsData from pipecat.observers.base_observer import BaseObserver, FramePushed from pipecat.processors.frame_processor import FrameDirection +from pipecat.utils.enums import RealtimeFeedbackType from pipecat.utils.time import nanoseconds_to_seconds class RealtimeFeedbackObserver(BaseObserver): - """Observer that sends real-time transcription, bot response, and metrics via WebSocket. + """Observer that sends real-time events via WebSocket and persists final transcripts. - Observes pipeline frames and sends events for: + WebSocket streaming (all events for live UI): - User transcriptions (interim and final) - Bot TTS text (with pts-based timing) - Function calls (start/end) - - TTFB metrics (LLM generation time only - filters to processors containing "LLM") + - TTFB metrics (LLM generation time only) + + Logs buffer persistence (only final data for post-call analysis): + - Complete user transcripts per turn (via on_user_turn_stopped) + - Complete assistant transcripts per turn (via on_assistant_turn_stopped) + - Function calls and TTFB metrics For frames with pts (presentation timestamp), we queue them and send at the appropriate time to sync with audio playback. @@ -134,8 +147,8 @@ class RealtimeFeedbackObserver(BaseObserver): if target_time > current_time: await asyncio.sleep(target_time - current_time) - # Send the message - await self._send_message(message) + # Send the message (clock queue only has TTS text, WS-only) + await self._send_ws(message) self._clock_queue.task_done() except asyncio.CancelledError: break @@ -164,11 +177,11 @@ class RealtimeFeedbackObserver(BaseObserver): return self._frames_seen.add(frame.id) - # Handle user transcriptions (interim) + # Handle user transcriptions (interim) - WebSocket only if isinstance(frame, InterimTranscriptionFrame): - await self._send_message( + await self._send_ws( { - "type": "rtf-user-transcription", + "type": RealtimeFeedbackType.USER_TRANSCRIPTION.value, "payload": { "text": frame.text, "final": False, @@ -177,11 +190,12 @@ class RealtimeFeedbackObserver(BaseObserver): }, } ) - # Handle user transcriptions (final) + # Handle user transcriptions (final) - WebSocket only + # Complete turn text is persisted via register_turn_handlers elif isinstance(frame, TranscriptionFrame): - await self._send_message( + await self._send_ws( { - "type": "rtf-user-transcription", + "type": RealtimeFeedbackType.USER_TRANSCRIPTION.value, "payload": { "text": frame.text, "final": True, @@ -190,13 +204,11 @@ class RealtimeFeedbackObserver(BaseObserver): }, } ) - # Increment turn counter on final user transcription - if self._logs_buffer: - self._logs_buffer.increment_turn() - # Handle bot TTS text - respect pts timing + # Handle bot TTS text - respect pts timing, WebSocket only + # Complete turn text is persisted via register_turn_handlers elif isinstance(frame, TTSTextFrame): message = { - "type": "rtf-bot-text", + "type": RealtimeFeedbackType.BOT_TEXT.value, "payload": { "text": frame.text, }, @@ -213,7 +225,7 @@ class RealtimeFeedbackObserver(BaseObserver): await self._clock_queue.put((frame.pts, frame.id, message)) else: # No pts, send immediately - await self._send_message(message) + await self._send_ws(message) # Handle function call in progress elif ( isinstance(frame, FunctionCallInProgressFrame) @@ -221,7 +233,7 @@ class RealtimeFeedbackObserver(BaseObserver): ): await self._send_message( { - "type": "rtf-function-call-start", + "type": RealtimeFeedbackType.FUNCTION_CALL_START.value, "payload": { "function_name": frame.function_name, "tool_call_id": frame.tool_call_id, @@ -235,7 +247,7 @@ class RealtimeFeedbackObserver(BaseObserver): ): await self._send_message( { - "type": "rtf-function-call-end", + "type": RealtimeFeedbackType.FUNCTION_CALL_END.value, "payload": { "function_name": frame.function_name, "tool_call_id": frame.tool_call_id, @@ -252,7 +264,7 @@ class RealtimeFeedbackObserver(BaseObserver): if metric_data.processor and "LLM" in metric_data.processor: await self._send_message( { - "type": "rtf-ttfb-metric", + "type": RealtimeFeedbackType.TTFB_METRIC.value, "payload": { "ttfb_seconds": metric_data.value, "processor": metric_data.processor, @@ -261,18 +273,77 @@ class RealtimeFeedbackObserver(BaseObserver): } ) - async def _send_message(self, message: dict): - """Send message via WebSocket AND append to logs buffer, handling errors gracefully.""" - # Send via WebSocket + async def _send_ws(self, message: dict): + """Send message via WebSocket only, handling errors gracefully.""" + if not self._ws_sender: + return try: + # Inject current node info from the logs buffer + if self._logs_buffer and self._logs_buffer.current_node_id: + message = { + **message, + "node_id": self._logs_buffer.current_node_id, + "node_name": self._logs_buffer.current_node_name, + } await self._ws_sender(message) except Exception as e: - # Log but don't fail - feedback is non-critical logger.debug(f"Failed to send real-time feedback message: {e}") - # Also append to logs buffer + async def _send_message(self, message: dict): + """Send message via WebSocket AND append to logs buffer.""" + await self._send_ws(message) + await self._append_to_buffer(message) + + async def _append_to_buffer(self, message: dict): + """Append message to logs buffer, handling errors gracefully.""" if self._logs_buffer: try: await self._logs_buffer.append(message) except Exception as e: logger.error(f"Failed to append to logs buffer: {e}") + + +def register_turn_log_handlers( + logs_buffer: "InMemoryLogsBuffer", + user_aggregator, + assistant_aggregator, +): + """Register event handlers on aggregators to persist final turn transcripts. + + Hooks into on_user_turn_stopped and on_assistant_turn_stopped to store + complete turn text in the logs buffer. Works for both WebRTC and telephony + calls — independent of WebSocket availability. + """ + + @user_aggregator.event_handler("on_user_turn_stopped") + async def on_user_turn_stopped(aggregator, strategy, message): + logs_buffer.increment_turn() + try: + await logs_buffer.append( + { + "type": RealtimeFeedbackType.USER_TRANSCRIPTION.value, + "payload": { + "text": message.content, + "final": True, + "timestamp": message.timestamp, + }, + } + ) + except Exception as e: + logger.error(f"Failed to append user turn to logs buffer: {e}") + + @assistant_aggregator.event_handler("on_assistant_turn_stopped") + async def on_assistant_turn_stopped(aggregator, message): + if message.content: + try: + await logs_buffer.append( + { + "type": RealtimeFeedbackType.BOT_TEXT.value, + "payload": { + "text": message.content, + "timestamp": message.timestamp, + }, + } + ) + except Exception as e: + logger.error(f"Failed to append assistant turn to logs buffer: {e}") diff --git a/api/services/pipecat/run_pipeline.py b/api/services/pipecat/run_pipeline.py index e202a2c..c16e1e7 100644 --- a/api/services/pipecat/run_pipeline.py +++ b/api/services/pipecat/run_pipeline.py @@ -12,7 +12,6 @@ from api.services.pipecat.audio_config import AudioConfig, create_audio_config from api.services.pipecat.event_handlers import ( register_audio_data_handler, register_event_handlers, - register_transcript_handlers, ) from api.services.pipecat.in_memory_buffers import InMemoryLogsBuffer from api.services.pipecat.pipeline_builder import ( @@ -24,7 +23,10 @@ from api.services.pipecat.pipeline_engine_callbacks_processor import ( PipelineEngineCallbacksProcessor, ) from api.services.pipecat.pipeline_metrics_aggregator import PipelineMetricsAggregator -from api.services.pipecat.realtime_feedback_observer import RealtimeFeedbackObserver +from api.services.pipecat.realtime_feedback_observer import ( + RealtimeFeedbackObserver, + register_turn_log_handlers, +) from api.services.pipecat.service_factory import ( create_llm_service, create_stt_service, @@ -73,7 +75,7 @@ from pipecat.turns.user_stop import ( TurnAnalyzerUserTurnStopStrategy, ) from pipecat.turns.user_turn_strategies import UserTurnStrategies -from pipecat.utils.enums import EndTaskReason +from pipecat.utils.enums import EndTaskReason, RealtimeFeedbackType from pipecat.utils.run_context import set_current_run_id from pipecat.utils.tracing.context_registry import ContextProviderRegistry @@ -511,35 +513,42 @@ async def _run_pipeline( # Create in-memory logs buffer early so it can be used by engine callbacks in_memory_logs_buffer = InMemoryLogsBuffer(workflow_run_id) - # Create node transition callback if WebSocket sender is available - node_transition_callback = None + # Create node transition callback (always logs to buffer, optionally streams to WS) ws_sender = get_ws_sender(workflow_run_id) - if ws_sender: - async def send_node_transition( - node_name: str, previous_node: Optional[str] - ) -> None: - """Send node transition event via WebSocket AND log to buffer.""" - message = { - "type": "rtf-node-transition", - "payload": { - "node_name": node_name, - "previous_node": previous_node, - }, - } - # Send via WebSocket + async def send_node_transition( + node_id: str, + node_name: str, + previous_node_id: Optional[str], + previous_node_name: Optional[str], + ) -> None: + """Send node transition event to logs buffer and optionally via WebSocket.""" + # Update current node on the buffer so subsequent events are tagged + in_memory_logs_buffer.set_current_node(node_id, node_name) + + message = { + "type": RealtimeFeedbackType.NODE_TRANSITION.value, + "payload": { + "node_id": node_id, + "node_name": node_name, + "previous_node_id": previous_node_id, + "previous_node_name": previous_node_name, + }, + } + # Send via WebSocket if available + if ws_sender: try: - await ws_sender(message) + await ws_sender({**message, "node_id": node_id, "node_name": node_name}) except Exception as e: logger.debug(f"Failed to send node transition via WebSocket: {e}") - # Log to in-memory buffer - try: - await in_memory_logs_buffer.append(message) - except Exception as e: - logger.error(f"Failed to append node transition to logs buffer: {e}") + # Always log to in-memory buffer (node_id/node_name injected by buffer's append) + try: + await in_memory_logs_buffer.append(message) + except Exception as e: + logger.error(f"Failed to append node transition to logs buffer: {e}") - node_transition_callback = send_node_transition + node_transition_callback = send_node_transition # Extract embeddings configuration from user config embeddings_api_key = None @@ -694,17 +703,48 @@ async def _run_pipeline( # Initialize the engine to set the initial context await engine.initialize() - # Add real-time feedback observer if WebSocket sender is available - # Note: ws_sender was already fetched earlier for node_transition_callback - if ws_sender: - feedback_observer = RealtimeFeedbackObserver( - ws_sender=ws_sender, - logs_buffer=in_memory_logs_buffer, - ) - task.add_observer(feedback_observer) + # Add real-time feedback observer (always logs to buffer, streams to WS if available) + feedback_observer = RealtimeFeedbackObserver( + ws_sender=ws_sender, + logs_buffer=in_memory_logs_buffer, + ) + task.add_observer(feedback_observer) + + # Register latency observer to log user-to-bot response latency + if task.user_bot_latency_observer: + + @task.user_bot_latency_observer.event_handler("on_latency_measured") + async def on_latency_measured(observer, latency_seconds): + message = { + "type": RealtimeFeedbackType.LATENCY_MEASURED.value, + "payload": { + "latency_seconds": latency_seconds, + }, + } + if ws_sender: + try: + ws_message = message + if in_memory_logs_buffer.current_node_id: + ws_message = { + **message, + "node_id": in_memory_logs_buffer.current_node_id, + "node_name": in_memory_logs_buffer.current_node_name, + } + await ws_sender(ws_message) + except Exception as e: + logger.debug(f"Failed to send latency via WebSocket: {e}") + try: + await in_memory_logs_buffer.append(message) + except Exception as e: + logger.error(f"Failed to append latency to logs buffer: {e}") + + # Register turn log handlers for all call types (WebRTC and telephony) + register_turn_log_handlers( + in_memory_logs_buffer, user_context_aggregator, assistant_context_aggregator + ) # Register event handlers - in_memory_audio_buffer, in_memory_transcript_buffer = register_event_handlers( + in_memory_audio_buffer = register_event_handlers( task, transport, workflow_run_id, @@ -716,12 +756,6 @@ async def _run_pipeline( ) register_audio_data_handler(audio_buffer, workflow_run_id, in_memory_audio_buffer) - register_transcript_handlers( - user_context_aggregator, - assistant_context_aggregator, - workflow_run_id, - in_memory_transcript_buffer, - ) try: # Run the pipeline diff --git a/api/services/workflow/pipecat_engine.py b/api/services/workflow/pipecat_engine.py index cfee3cb..59c4d59 100644 --- a/api/services/workflow/pipecat_engine.py +++ b/api/services/workflow/pipecat_engine.py @@ -63,7 +63,7 @@ class PipecatEngine: call_context_vars: dict, workflow_run_id: Optional[int] = None, node_transition_callback: Optional[ - Callable[[str, Optional[str]], Awaitable[None]] + Callable[[str, str, Optional[str], Optional[str]], Awaitable[None]] ] = None, embeddings_api_key: Optional[str] = None, embeddings_model: Optional[str] = None, @@ -456,14 +456,22 @@ class PipecatEngine: # Track previous node for transition event previous_node_name = self._current_node.name if self._current_node else None + previous_node_id = self._current_node.id if self._current_node else None # Set current node for all nodes (including static ones) so STT mute filter works self._current_node = node + # Track visited nodes in gathered context for call tags + nodes_visited = self._gathered_context.setdefault("nodes_visited", []) + if node.name not in nodes_visited: + nodes_visited.append(node.name) + # Send node transition event if callback is provided if self._node_transition_callback: try: - await self._node_transition_callback(node.name, previous_node_name) + await self._node_transition_callback( + node_id, node.name, previous_node_id, previous_node_name + ) except Exception as e: # Log but don't fail - feedback is non-critical logger.debug(f"Failed to send node transition event: {e}") diff --git a/api/services/workflow/pipecat_engine_custom_tools.py b/api/services/workflow/pipecat_engine_custom_tools.py index 5bd4809..f226791 100644 --- a/api/services/workflow/pipecat_engine_custom_tools.py +++ b/api/services/workflow/pipecat_engine_custom_tools.py @@ -231,6 +231,20 @@ class CustomToolManager: message_type = config.get("messageType", "none") custom_message = config.get("customMessage", "") + # Handle end call reason if enabled + end_call_reason_enabled = config.get("endCallReason", False) + if end_call_reason_enabled: + reason = ( + function_call_params.arguments.get("reason", "") + or "end_call_tool" + ) + logger.info(f"End call reason: {reason}") + self._engine._gathered_context["call_disposition"] = reason + call_tags = self._engine._gathered_context.get("call_tags", []) + if reason not in call_tags: + call_tags.extend([reason, "end_call_tool"]) + self._engine._gathered_context["call_tags"] = call_tags + # Send result callback first await function_call_params.result_callback( {"status": "success", "action": "ending_call"}, diff --git a/api/services/workflow/tools/custom_tool.py b/api/services/workflow/tools/custom_tool.py index 7a9d6d9..da8775b 100644 --- a/api/services/workflow/tools/custom_tool.py +++ b/api/services/workflow/tools/custom_tool.py @@ -51,6 +51,19 @@ def tool_to_function_schema(tool: Any) -> Dict[str, Any]: if param_required: required.append(param_name) + # If this is an end_call tool with endCallReason enabled, add a required 'reason' parameter + if definition.get("type") == "end_call" and config.get("endCallReason", False): + default_description = ( + "The reason for ending the call (e.g., 'voicemail_detected', " + "'issue_resolved', 'customer_requested')" + ) + properties["reason"] = { + "type": "string", + "description": config.get("endCallReasonDescription") + or default_description, + } + required.append("reason") + # Sanitize tool name for function name (lowercase, underscores only) function_name = re.sub(r"[^a-z0-9_]", "_", tool.name.lower()) # Remove consecutive underscores and trim diff --git a/pipecat b/pipecat index fbc9a76..6aa0502 160000 --- a/pipecat +++ b/pipecat @@ -1 +1 @@ -Subproject commit fbc9a768445e8f683721744659fc8904d4012081 +Subproject commit 6aa0502a9834d536aba9589cec87d827e66f2fad diff --git a/ui/src/app/tools/[toolUuid]/components/EndCallToolConfig.tsx b/ui/src/app/tools/[toolUuid]/components/EndCallToolConfig.tsx index 65a1db3..6185d7b 100644 --- a/ui/src/app/tools/[toolUuid]/components/EndCallToolConfig.tsx +++ b/ui/src/app/tools/[toolUuid]/components/EndCallToolConfig.tsx @@ -4,6 +4,7 @@ import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/com import { Input } from "@/components/ui/input"; import { Label } from "@/components/ui/label"; import { RadioGroup, RadioGroupItem } from "@/components/ui/radio-group"; +import { Switch } from "@/components/ui/switch"; import { Textarea } from "@/components/ui/textarea"; import { type EndCallMessageType } from "../../config"; @@ -17,6 +18,10 @@ export interface EndCallToolConfigProps { onMessageTypeChange: (messageType: EndCallMessageType) => void; customMessage: string; onCustomMessageChange: (message: string) => void; + endCallReason: boolean; + onEndCallReasonChange: (enabled: boolean) => void; + endCallReasonDescription: string; + onEndCallReasonDescriptionChange: (description: string) => void; } export function EndCallToolConfig({ @@ -28,6 +33,10 @@ export function EndCallToolConfig({ onMessageTypeChange, customMessage, onCustomMessageChange, + endCallReason, + onEndCallReasonChange, + endCallReasonDescription, + onEndCallReasonDescriptionChange, }: EndCallToolConfigProps) { return ( @@ -63,6 +72,35 @@ export function EndCallToolConfig({ /> +
+
+ + +
+ + {endCallReason && ( +
+ + +