diff --git a/api/services/pipecat/realtime_feedback_observer.py b/api/services/pipecat/realtime_feedback_observer.py index ff12272..2229069 100644 --- a/api/services/pipecat/realtime_feedback_observer.py +++ b/api/services/pipecat/realtime_feedback_observer.py @@ -30,6 +30,8 @@ if TYPE_CHECKING: from api.services.pipecat.in_memory_buffers import InMemoryLogsBuffer from pipecat.frames.frames import ( + BotStartedSpeakingFrame, + BotStoppedSpeakingFrame, CancelFrame, EndFrame, ErrorFrame, @@ -37,11 +39,12 @@ from pipecat.frames.frames import ( FunctionCallResultFrame, InterimTranscriptionFrame, InterruptionFrame, - LLMTextFrame, MetricsFrame, StopFrame, TranscriptionFrame, - TTSSpeakFrame, + TTSTextFrame, + UserMuteStartedFrame, + UserMuteStoppedFrame, ) from pipecat.metrics.metrics import TTFBMetricsData from pipecat.observers.base_observer import BaseObserver, FramePushed @@ -174,6 +177,30 @@ class RealtimeFeedbackObserver(BaseObserver): await self._handle_interruption() return + # Bot speaking state - WS only (ephemeral state signals, not persisted) + if isinstance(frame, BotStartedSpeakingFrame): + await self._send_ws( + {"type": RealtimeFeedbackType.BOT_STARTED_SPEAKING.value, "payload": {}} + ) + return + if isinstance(frame, BotStoppedSpeakingFrame): + await self._send_ws( + {"type": RealtimeFeedbackType.BOT_STOPPED_SPEAKING.value, "payload": {}} + ) + return + + # User mute state - WS only (ephemeral state signals, not persisted) + if isinstance(frame, UserMuteStartedFrame): + await self._send_ws( + {"type": RealtimeFeedbackType.USER_MUTE_STARTED.value, "payload": {}} + ) + return + if isinstance(frame, UserMuteStoppedFrame): + await self._send_ws( + {"type": RealtimeFeedbackType.USER_MUTE_STOPPED.value, "payload": {}} + ) + return + # Skip already processed frames (frames can be observed multiple times) if frame.id in self._frames_seen: return @@ -206,20 +233,9 @@ class RealtimeFeedbackObserver(BaseObserver): }, } ) - # Handle TTSSpeakFrame (e.g. greeting) - send immediately via WS only - # Final turn text is persisted via on_assistant_turn_stopped to avoid duplication - elif isinstance(frame, TTSSpeakFrame): - await self._send_ws( - { - "type": RealtimeFeedbackType.BOT_TEXT.value, - "payload": { - "text": frame.text, - }, - } - ) # Handle bot TTS text - respect pts timing, WebSocket only # Complete turn text is persisted via register_turn_handlers - elif isinstance(frame, LLMTextFrame): + elif isinstance(frame, TTSTextFrame): message = { "type": RealtimeFeedbackType.BOT_TEXT.value, "payload": { diff --git a/api/services/pipecat/recording_router_processor.py b/api/services/pipecat/recording_router_processor.py index 59611ef..be4eb5b 100644 --- a/api/services/pipecat/recording_router_processor.py +++ b/api/services/pipecat/recording_router_processor.py @@ -29,6 +29,7 @@ from pipecat.frames.frames import ( TTSAudioRawFrame, TTSStartedFrame, TTSStoppedFrame, + TTSTextFrame, ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor @@ -98,11 +99,9 @@ class RecordingRouterProcessor(FrameProcessor): await self.push_frame(frame, direction) return - # --- Recording mode: buffer recording_id, suppress TTS --- + # --- Recording mode: accumulate recording_id silently --- if self._mode == "recording": self._recording_id_buffer += frame.text - frame.skip_tts = True - await self.push_frame(frame, direction) return # --- Detection mode: buffer until marker found --- @@ -114,13 +113,11 @@ class RecordingRouterProcessor(FrameProcessor): self._mode = "recording" marker_end = buffered_text.index(RECORDING_MARKER) + len(RECORDING_MARKER) - # Push buffered frames with skip_tts, extract recording_id from post-marker text + # Extract recording_id from post-marker text (don't push frames) cumulative = 0 for buf_frame, buf_dir in self._frame_buffer: - buf_frame.skip_tts = True frame_start = cumulative cumulative += len(buf_frame.text) - await self.push_frame(buf_frame, buf_dir) # Capture any recording_id text after the marker if cumulative > marker_end: @@ -183,6 +180,13 @@ class RecordingRouterProcessor(FrameProcessor): if self._mode == "recording": recording_id = self._recording_id_buffer.strip() if recording_id: + # Push accumulated text as TTSTextFrame for UI feedback via observer + await self.push_frame( + TTSTextFrame( + text=RECORDING_MARKER + self._recording_id_buffer, + aggregated_by="recording_router", + ) + ) await self._play_recording(recording_id) else: logger.warning( diff --git a/api/services/pipecat/run_pipeline.py b/api/services/pipecat/run_pipeline.py index deee846..c98a46b 100644 --- a/api/services/pipecat/run_pipeline.py +++ b/api/services/pipecat/run_pipeline.py @@ -526,6 +526,7 @@ async def _run_pipeline( node_name: str, previous_node_id: Optional[str], previous_node_name: Optional[str], + allow_interrupt: bool = False, ) -> None: """Send node transition event to logs buffer and optionally via WebSocket.""" # Update current node on the buffer so subsequent events are tagged @@ -538,6 +539,7 @@ async def _run_pipeline( "node_name": node_name, "previous_node_id": previous_node_id, "previous_node_name": previous_node_name, + "allow_interrupt": allow_interrupt, }, } # Send via WebSocket if available diff --git a/api/services/workflow/pipecat_engine.py b/api/services/workflow/pipecat_engine.py index d95c832..4649ec0 100644 --- a/api/services/workflow/pipecat_engine.py +++ b/api/services/workflow/pipecat_engine.py @@ -67,7 +67,7 @@ class PipecatEngine: call_context_vars: dict, workflow_run_id: Optional[int] = None, node_transition_callback: Optional[ - Callable[[str, str, Optional[str], Optional[str]], Awaitable[None]] + Callable[[str, str, Optional[str], Optional[str], bool], Awaitable[None]] ] = None, embeddings_api_key: Optional[str] = None, embeddings_model: Optional[str] = None, @@ -521,7 +521,11 @@ class PipecatEngine: if self._node_transition_callback: try: await self._node_transition_callback( - node_id, node.name, previous_node_id, previous_node_name + node_id, + node.name, + previous_node_id, + previous_node_name, + node.allow_interrupt, ) except Exception as e: # Log but don't fail - feedback is non-critical diff --git a/docs/configurations/interruption.mdx b/docs/configurations/interruption.mdx new file mode 100644 index 0000000..56baa1f --- /dev/null +++ b/docs/configurations/interruption.mdx @@ -0,0 +1,57 @@ +--- +title: "Interruption Handling" +description: "Control whether users can interrupt the bot while it is speaking by configuring the Allow Interruption toggle on each node." +--- + +## Overview + +Interruption handling controls whether the user can "barge in" and interrupt the bot while it is speaking. This is configured **per node** in the workflow editor, giving you fine-grained control over conversation flow. + +![Allow Interruption Toggle](../images/allow-interruption.png) + +## How It Works + +Each node in your workflow has an **Allow Interruption** toggle: + +- **Disabled (default)** — The bot finishes its entire response before accepting user input. The user's microphone is temporarily muted while the bot speaks. +- **Enabled** — The bot stops speaking as soon as the user starts talking, and immediately processes their input. This creates a natural, conversational experience. + + +When interruption is disabled and the user tries to speak during bot speech, a one-time warning appears in the live transcript indicating that interruption is disabled for that step. + + +## When to Disable Interruption + +Disabling interruption is useful when the bot needs to deliver a complete message without being cut off: + +- **Legal disclaimers** — Ensure the full disclaimer is spoken before proceeding. +- **Critical instructions** — Step-by-step directions that lose meaning if partially heard. +- **Greeting or introduction** — Let the bot finish its opening before the user responds. +- **Confirmation summaries** — Read back important details (appointment times, order totals) in full. + +## When to Enable Interruption + +Keep interruption enabled for interactive conversation stages: + +- **Q&A or objection handling** — Let the user jump in naturally. +- **Open-ended discussion** — Feels more human when either party can interject. +- **Long responses** — Allow the user to redirect if the bot goes off track. + +## Configuring Interruption + +1. Open your workflow in the **Voice Agent Builder**. +2. Select the node you want to configure. +3. Toggle **Allow Interruption** on or off in the node settings panel. +4. Save your workflow. + +You can set different interruption behavior for each node. For example, disable interruption on your Start Node greeting but enable it on all subsequent Agent Nodes. + +## What the User Experiences + +| Interruption | Bot Speaking | User Speaks | Result | +|---|---|---|---| +| Enabled | Yes | Yes | Bot stops, processes user input | +| Disabled | Yes | Yes | Bot continues, user input is ignored until bot finishes | +| Either | No | Yes | User input is processed normally | + +When interruption is disabled, the platform mutes the user's audio input while the bot is speaking. Once the bot finishes, the microphone is automatically unmuted and the user can respond normally. \ No newline at end of file diff --git a/docs/docs.json b/docs/docs.json index df1ea54..0c3ef4b 100644 --- a/docs/docs.json +++ b/docs/docs.json @@ -45,6 +45,7 @@ "configurations/voice", "configurations/transcriber", "configurations/api-keys", + "configurations/interruption", "configurations/tracing" ] }, diff --git a/docs/images/allow-interruption.png b/docs/images/allow-interruption.png new file mode 100644 index 0000000..d7cf78a Binary files /dev/null and b/docs/images/allow-interruption.png differ diff --git a/pipecat b/pipecat index 1960013..a1fc7ab 160000 --- a/pipecat +++ b/pipecat @@ -1 +1 @@ -Subproject commit 196001311d13dd8b1a03c4f566e4979176d6ed48 +Subproject commit a1fc7ab3c80e3ef963bb03dccc80653be5ede20e diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/components/RealtimeFeedback.tsx b/ui/src/app/workflow/[workflowId]/run/[runId]/components/RealtimeFeedback.tsx index 9a0f031..8e5937e 100644 --- a/ui/src/app/workflow/[workflowId]/run/[runId]/components/RealtimeFeedback.tsx +++ b/ui/src/app/workflow/[workflowId]/run/[runId]/components/RealtimeFeedback.tsx @@ -17,6 +17,7 @@ interface RealtimeFeedbackEvent { result?: string; node_name?: string; previous_node?: string; + allow_interrupt?: boolean; ttfb_seconds?: number; processor?: string; model?: string; @@ -79,6 +80,9 @@ function convertLogEventsToTranscriptEvents(events: RealtimeFeedbackEvent[]): Tr case 'rtf-pipeline-error': type = 'pipeline-error'; break; + case 'rtf-interrupt-warning': + type = 'interrupt-warning'; + break; default: type = 'bot-text'; } @@ -93,6 +97,7 @@ function convertLogEventsToTranscriptEvents(events: RealtimeFeedbackEvent[]): Tr status, nodeName: event.payload.node_name, previousNode: event.payload.previous_node, + allowInterrupt: event.payload.allow_interrupt, ttfbSeconds: event.payload.ttfb_seconds, processor: event.payload.processor, model: event.payload.model, @@ -114,6 +119,7 @@ function convertLiveMessagesToTranscriptEvents(messages: FeedbackMessage[]): Tra status: msg.status, nodeName: msg.nodeName, previousNode: msg.previousNode, + allowInterrupt: msg.allowInterrupt, ttfbSeconds: msg.ttfbSeconds, processor: msg.processor, model: msg.model, diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/components/UnifiedTranscript.tsx b/ui/src/app/workflow/[workflowId]/run/[runId]/components/UnifiedTranscript.tsx index 3fff78b..c3a0038 100644 --- a/ui/src/app/workflow/[workflowId]/run/[runId]/components/UnifiedTranscript.tsx +++ b/ui/src/app/workflow/[workflowId]/run/[runId]/components/UnifiedTranscript.tsx @@ -48,6 +48,7 @@ export const UnifiedTranscript = ({ functionName: msg.functionName, status: msg.status, nodeName: msg.nodeName, + allowInterrupt: msg.allowInterrupt, ttfbSeconds: msg.ttfbSeconds, fatal: msg.fatal, })); diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/components/shared/TranscriptMessage.tsx b/ui/src/app/workflow/[workflowId]/run/[runId]/components/shared/TranscriptMessage.tsx index 19b430a..d2efe5a 100644 --- a/ui/src/app/workflow/[workflowId]/run/[runId]/components/shared/TranscriptMessage.tsx +++ b/ui/src/app/workflow/[workflowId]/run/[runId]/components/shared/TranscriptMessage.tsx @@ -1,16 +1,17 @@ 'use client'; -import { AlertTriangle, Brain, GitBranch, Wrench } from 'lucide-react'; +import { AlertTriangle, Brain, ExternalLink, GitBranch, MicOff, Wrench } from 'lucide-react'; import { cn } from '@/lib/utils'; export interface TranscriptMessageData { id: string; - type: 'user-transcription' | 'bot-text' | 'function-call' | 'node-transition' | 'ttfb-metric' | 'pipeline-error'; + type: 'user-transcription' | 'bot-text' | 'function-call' | 'node-transition' | 'ttfb-metric' | 'pipeline-error' | 'interrupt-warning'; text: string; final?: boolean; functionName?: string; nodeName?: string; + allowInterrupt?: boolean; ttfbSeconds?: number; fatal?: boolean; } @@ -37,6 +38,31 @@ export function TranscriptMessage({ message, nextMessage }: TranscriptMessagePro ); } + // Interrupt warning - show as an amber alert (one-time) + if (message.type === 'interrupt-warning') { + return ( +
+ +
+
+ Interruption Disabled +
+
+ {message.text} +
+ + Learn more + +
+
+ ); + } + // Pipeline error - show as a red alert if (message.type === 'pipeline-error') { return ( diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/hooks/useWebSocketRTC.tsx b/ui/src/app/workflow/[workflowId]/run/[runId]/hooks/useWebSocketRTC.tsx index 8446509..a2b4733 100644 --- a/ui/src/app/workflow/[workflowId]/run/[runId]/hooks/useWebSocketRTC.tsx +++ b/ui/src/app/workflow/[workflowId]/run/[runId]/hooks/useWebSocketRTC.tsx @@ -18,7 +18,7 @@ interface UseWebSocketRTCProps { export interface FeedbackMessage { id: string; - type: 'user-transcription' | 'bot-text' | 'function-call' | 'node-transition' | 'ttfb-metric' | 'pipeline-error'; + type: 'user-transcription' | 'bot-text' | 'function-call' | 'node-transition' | 'ttfb-metric' | 'pipeline-error' | 'interrupt-warning'; text: string; final?: boolean; timestamp: string; @@ -27,6 +27,7 @@ export interface FeedbackMessage { // Node transition fields nodeName?: string; previousNode?: string; + allowInterrupt?: boolean; // TTFB metric fields ttfbSeconds?: number; processor?: string; @@ -82,6 +83,12 @@ export const useWebSocketRTC = ({ workflowId, workflowRunId, accessToken, initia const pc_id = useRef(generateSecureId()); + // Mute/speaking state tracking refs (ephemeral signals, not rendered directly) + const userMutedRef = useRef(false); + const firstBotSpeechCompletedRef = useRef(false); + const currentAllowInterruptRef = useRef(undefined); + const interruptWarningShownRef = useRef(false); + // Get WebSocket URL from client configuration const getWebSocketUrl = useCallback(() => { // Get base URL from client configuration @@ -287,6 +294,24 @@ export const useWebSocketRTC = ({ workflowId, workflowRunId, accessToken, initia case 'rtf-user-transcription': { const transcription = message.payload; + + // Show one-time warning if user speaks while muted on a no-interrupt node + // Skip during initial bot greeting (muted by MuteUntilFirstBotComplete strategy) + if ( + !interruptWarningShownRef.current && + firstBotSpeechCompletedRef.current && + userMutedRef.current && + currentAllowInterruptRef.current === false + ) { + interruptWarningShownRef.current = true; + setFeedbackMessages(prev => [...prev, { + id: `interrupt-warning-${Date.now()}`, + type: 'interrupt-warning', + text: 'Interruption is disabled for this step. The bot will finish speaking before processing your input. You can enable interruption in the workflow editor.', + timestamp: new Date().toISOString(), + }]); + } + setFeedbackMessages(prev => { // Step 1: Finalize the last bot message (user started speaking) const messagesWithBotFinalized = prev.map((msg, idx) => { @@ -322,7 +347,7 @@ export const useWebSocketRTC = ({ workflowId, workflowRunId, accessToken, initia // Append to existing bot message return [ ...prev.slice(0, -1), - { ...last, text: last.text + message.payload.text } + { ...last, text: last.text + ' ' + message.payload.text } ]; } // Start new bot message @@ -368,13 +393,15 @@ export const useWebSocketRTC = ({ workflowId, workflowRunId, accessToken, initia } case 'rtf-node-transition': { - const { node_name, previous_node } = message.payload; + const { node_name, previous_node_name, allow_interrupt } = message.payload; + currentAllowInterruptRef.current = allow_interrupt; setFeedbackMessages(prev => [...prev, { id: `node-${Date.now()}`, type: 'node-transition', text: node_name, nodeName: node_name, - previousNode: previous_node, + previousNode: previous_node_name, + allowInterrupt: allow_interrupt, timestamp: new Date().toISOString(), }]); break; @@ -407,6 +434,24 @@ export const useWebSocketRTC = ({ workflowId, workflowRunId, accessToken, initia break; } + // Ephemeral state signals — update refs only, no UI messages + case 'rtf-bot-started-speaking': + break; + + case 'rtf-bot-stopped-speaking': + if (!firstBotSpeechCompletedRef.current) { + firstBotSpeechCompletedRef.current = true; + } + break; + + case 'rtf-user-mute-started': + userMutedRef.current = true; + break; + + case 'rtf-user-mute-stopped': + userMutedRef.current = false; + break; + default: logger.warn('Unknown message type:', message.type); } diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/utils/processTranscriptEvents.ts b/ui/src/app/workflow/[workflowId]/run/[runId]/utils/processTranscriptEvents.ts index 44ed9c5..5429ab2 100644 --- a/ui/src/app/workflow/[workflowId]/run/[runId]/utils/processTranscriptEvents.ts +++ b/ui/src/app/workflow/[workflowId]/run/[runId]/utils/processTranscriptEvents.ts @@ -4,7 +4,7 @@ */ export interface TranscriptEvent { - type: 'user-transcription' | 'bot-text' | 'function-call' | 'node-transition' | 'ttfb-metric' | 'pipeline-error'; + type: 'user-transcription' | 'bot-text' | 'function-call' | 'node-transition' | 'ttfb-metric' | 'pipeline-error' | 'interrupt-warning'; text: string; final?: boolean; timestamp: string; @@ -13,6 +13,7 @@ export interface TranscriptEvent { status?: 'running' | 'completed'; nodeName?: string; previousNode?: string; + allowInterrupt?: boolean; ttfbSeconds?: number; processor?: string; model?: string; @@ -28,6 +29,7 @@ export interface ProcessedMessage { functionName?: string; status?: 'running' | 'completed'; nodeName?: string; + allowInterrupt?: boolean; ttfbSeconds?: number; fatal?: boolean; } @@ -69,7 +71,7 @@ export function processTranscriptEvents(events: TranscriptEvent[]): ProcessedMes } else if (event.type === 'bot-text') { // Combine consecutive bot-text from the same turn if (currentBotText && currentBotText.event.turn === event.turn) { - currentBotText.text = currentBotText.text + event.text; + currentBotText.text = currentBotText.text + ' ' + event.text; } else { flushBotText(); currentBotText = { event, text: event.text }; @@ -144,6 +146,7 @@ function convertToProcessedMessage(event: TranscriptEvent, overrideText?: string functionName: event.functionName, status: event.status, nodeName: event.nodeName, + allowInterrupt: event.allowInterrupt, ttfbSeconds: event.ttfbSeconds, fatal: event.fatal, };