diff --git a/api/routes/webrtc_signaling.py b/api/routes/webrtc_signaling.py index ddcb4a1..26fe535 100644 --- a/api/routes/webrtc_signaling.py +++ b/api/routes/webrtc_signaling.py @@ -18,11 +18,16 @@ from aiortc import RTCIceServer from aiortc.sdp import candidate_from_sdp from fastapi import APIRouter, Depends, WebSocket, WebSocketDisconnect from loguru import logger +from starlette.websockets import WebSocketState from api.db import db_client from api.db.models import UserModel from api.services.auth.depends import get_user_ws from api.services.pipecat.run_pipeline import run_pipeline_smallwebrtc +from api.services.pipecat.ws_sender_registry import ( + register_ws_sender, + unregister_ws_sender, +) from api.services.quota_service import check_dograh_quota from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection from pipecat.utils.context import set_current_run_id @@ -92,6 +97,9 @@ class SignalingManager: # Cleanup self._connections.pop(connection_id, None) + # Unregister WebSocket sender for real-time feedback + unregister_ws_sender(workflow_run_id) + # Clean up all peer connections for this workflow run # Note: In a WebSocket-based signaling approach (vs HTTP PATCH), # we maintain our own connection map instead of relying on @@ -182,6 +190,13 @@ class SignalingManager: # Store peer connection using client's pc_id self._peer_connections[pc_id] = pc + # Register WebSocket sender for real-time feedback + async def ws_sender(message: dict): + if ws.application_state == WebSocketState.CONNECTED: + await ws.send_json(message) + + register_ws_sender(workflow_run_id, ws_sender) + # Setup closed handler @pc.event_handler("closed") async def handle_disconnected(webrtc_connection: SmallWebRTCConnection): diff --git a/api/services/pipecat/realtime_feedback_observer.py b/api/services/pipecat/realtime_feedback_observer.py new file mode 100644 index 0000000..bf5275d --- /dev/null +++ b/api/services/pipecat/realtime_feedback_observer.py @@ -0,0 +1,227 @@ +"""Real-time feedback observer for sending pipeline events to the frontend. + +This observer watches pipeline frames and sends relevant events (transcriptions, +bot text) over WebSocket to provide real-time feedback in the UI. + +For frames with presentation timestamps (pts), like TTSTextFrame, we respect +the timing by queuing them and sending at the appropriate time, similar to +how base_output.py handles timed frames. +""" + +import asyncio +import time +from typing import Awaitable, Callable, Optional, Set + +from loguru import logger + +from pipecat.frames.frames import ( + CancelFrame, + EndFrame, + FunctionCallInProgressFrame, + FunctionCallResultFrame, + InterimTranscriptionFrame, + InterruptionFrame, + StopFrame, + TranscriptionFrame, + TTSTextFrame, +) +from pipecat.observers.base_observer import BaseObserver, FramePushed +from pipecat.processors.frame_processor import FrameDirection +from pipecat.utils.time import nanoseconds_to_seconds + + +class RealtimeFeedbackObserver(BaseObserver): + """Observer that sends real-time transcription and bot response events via WebSocket. + + For frames with pts (presentation timestamp), we queue them and send at the + appropriate time to sync with audio playback. + """ + + def __init__( + self, + ws_sender: Callable[[dict], Awaitable[None]], + ): + """ + Args: + ws_sender: Async function to send messages over WebSocket. + Expected signature: async def send(message: dict) -> None + """ + super().__init__() + self._ws_sender = ws_sender + self._frames_seen: Set[str] = set() + + # Clock/timing for pts-based frames (similar to base_output.py) + self._clock_queue: Optional[asyncio.PriorityQueue] = None + self._clock_task: Optional[asyncio.Task] = None + self._clock_start_time: Optional[float] = ( + None # Wall clock time when we started + ) + self._pts_start_time: Optional[int] = None # First pts value we saw + + async def _ensure_clock_task(self): + """Create the clock task if it doesn't exist.""" + if self._clock_queue is None: + self._clock_queue = asyncio.PriorityQueue() + self._clock_task = asyncio.create_task(self._clock_task_handler()) + + async def _cancel_clock_task(self): + """Cancel the clock task and clear the queue. + + Called on interruption to discard any pending bot text that + hasn't been sent yet. + """ + if self._clock_task: + self._clock_task.cancel() + try: + await self._clock_task + except asyncio.CancelledError: + pass + self._clock_task = None + self._clock_queue = None + # Reset timing references so next bot response starts fresh + self._clock_start_time = None + self._pts_start_time = None + + async def _handle_interruption(self): + """Handle interruption by clearing queued bot text. + + Similar to base_output.py's handle_interruptions, we cancel the + clock task and recreate it to discard pending frames. + """ + await self._cancel_clock_task() + + async def _clock_task_handler(self): + """Process timed frames from the queue, respecting their presentation timestamps. + + Similar to base_output.py's _clock_task_handler, we wait until the + frame's pts time has arrived before sending. + """ + while True: + try: + pts, _frame_id, message = await self._clock_queue.get() + + # Calculate when to send based on pts relative to our start time + if ( + self._clock_start_time is not None + and self._pts_start_time is not None + ): + # Target time = start wall time + (frame pts - start pts) in seconds + target_time = self._clock_start_time + nanoseconds_to_seconds( + pts - self._pts_start_time + ) + current_time = time.time() + if target_time > current_time: + await asyncio.sleep(target_time - current_time) + + # Send the message + await self._send_message(message) + self._clock_queue.task_done() + except asyncio.CancelledError: + break + except Exception as e: + logger.debug(f"Clock task error: {e}") + + async def on_push_frame(self, data: FramePushed): + """Process frames and send relevant ones to the client.""" + frame = data.frame + frame_direction = data.direction + + # Handle pipeline termination - stop clock task + if isinstance(frame, (EndFrame, CancelFrame, StopFrame)): + await self._cancel_clock_task() + return + + # Handle interruptions - clear any queued bot text + if isinstance(frame, InterruptionFrame): + await self._handle_interruption() + return + + # Skip already processed frames (frames can be observed multiple times) + if frame.id in self._frames_seen: + return + self._frames_seen.add(frame.id) + + # Handle user transcriptions (interim) + if isinstance(frame, InterimTranscriptionFrame): + await self._send_message( + { + "type": "rtf-user-transcription", + "payload": { + "text": frame.text, + "final": False, + "user_id": frame.user_id, + "timestamp": frame.timestamp, + }, + } + ) + # Handle user transcriptions (final) + elif isinstance(frame, TranscriptionFrame): + await self._send_message( + { + "type": "rtf-user-transcription", + "payload": { + "text": frame.text, + "final": True, + "user_id": frame.user_id, + "timestamp": frame.timestamp, + }, + } + ) + # Handle bot TTS text - respect pts timing + elif isinstance(frame, TTSTextFrame): + message = { + "type": "rtf-bot-text", + "payload": { + "text": frame.text, + }, + } + + # If frame has pts, queue it for timed delivery + if frame.pts: + # Initialize timing reference on first pts frame + if self._pts_start_time is None: + self._pts_start_time = frame.pts + self._clock_start_time = time.time() + + await self._ensure_clock_task() + await self._clock_queue.put((frame.pts, frame.id, message)) + else: + # No pts, send immediately + await self._send_message(message) + # Handle function call in progress + elif ( + isinstance(frame, FunctionCallInProgressFrame) + and frame_direction == FrameDirection.DOWNSTREAM + ): + await self._send_message( + { + "type": "rtf-function-call-start", + "payload": { + "function_name": frame.function_name, + "tool_call_id": frame.tool_call_id, + }, + } + ) + # Handle function call result + elif ( + isinstance(frame, FunctionCallResultFrame) + and frame_direction == FrameDirection.DOWNSTREAM + ): + await self._send_message( + { + "type": "rtf-function-call-end", + "payload": { + "function_name": frame.function_name, + "tool_call_id": frame.tool_call_id, + "result": str(frame.result) if frame.result else None, + }, + } + ) + + async def _send_message(self, message: dict): + """Send message via WebSocket, handling errors gracefully.""" + try: + await self._ws_sender(message) + except Exception as e: + # Log but don't fail - feedback is non-critical + logger.debug(f"Failed to send real-time feedback message: {e}") diff --git a/api/services/pipecat/run_pipeline.py b/api/services/pipecat/run_pipeline.py index f2a966c..b385a2c 100644 --- a/api/services/pipecat/run_pipeline.py +++ b/api/services/pipecat/run_pipeline.py @@ -23,6 +23,7 @@ from api.services.pipecat.pipeline_engine_callbacks_processor import ( PipelineEngineCallbacksProcessor, ) from api.services.pipecat.pipeline_metrics_aggregator import PipelineMetricsAggregator +from api.services.pipecat.realtime_feedback_observer import RealtimeFeedbackObserver from api.services.pipecat.service_factory import ( create_llm_service, create_stt_service, @@ -38,6 +39,7 @@ from api.services.pipecat.transport_setup import ( create_vonage_transport, create_webrtc_transport, ) +from api.services.pipecat.ws_sender_registry import get_ws_sender from api.services.telephony.stasis_rtp_connection import StasisRTPConnection from api.services.workflow.dto import ReactFlowDTO from api.services.workflow.pipecat_engine import PipecatEngine @@ -564,6 +566,12 @@ async def _run_pipeline( # Create pipeline task with audio configuration task = create_pipeline_task(pipeline, workflow_run_id, audio_config) + # Add real-time feedback observer if WebSocket sender is available + ws_sender = get_ws_sender(workflow_run_id) + if ws_sender: + feedback_observer = RealtimeFeedbackObserver(ws_sender=ws_sender) + task.add_observer(feedback_observer) + # Now set the task on the engine engine.set_task(task) diff --git a/api/services/pipecat/ws_sender_registry.py b/api/services/pipecat/ws_sender_registry.py new file mode 100644 index 0000000..5b62d8f --- /dev/null +++ b/api/services/pipecat/ws_sender_registry.py @@ -0,0 +1,28 @@ +"""Registry to store WebSocket senders by workflow_run_id. + +This allows the pipeline observer to send messages back through +the signaling WebSocket without passing the WebSocket directly. +""" + +from typing import Awaitable, Callable, Dict, Optional + +_ws_senders: Dict[int, Callable[[dict], Awaitable[None]]] = {} + + +def register_ws_sender( + workflow_run_id: int, sender: Callable[[dict], Awaitable[None]] +) -> None: + """Register a WebSocket sender for a workflow run.""" + _ws_senders[workflow_run_id] = sender + + +def unregister_ws_sender(workflow_run_id: int) -> None: + """Unregister a WebSocket sender for a workflow run.""" + _ws_senders.pop(workflow_run_id, None) + + +def get_ws_sender( + workflow_run_id: int, +) -> Optional[Callable[[dict], Awaitable[None]]]: + """Get the WebSocket sender for a workflow run.""" + return _ws_senders.get(workflow_run_id) diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/BrowserCall.tsx b/ui/src/app/workflow/[workflowId]/run/[runId]/BrowserCall.tsx index 50ab556..4bfd490 100644 --- a/ui/src/app/workflow/[workflowId]/run/[runId]/BrowserCall.tsx +++ b/ui/src/app/workflow/[workflowId]/run/[runId]/BrowserCall.tsx @@ -9,6 +9,7 @@ import { ApiKeyErrorDialog, AudioControls, ConnectionStatus, + RealtimeFeedbackPanel, WorkflowConfigErrorDialog } from "./components"; import { useWebSocketRTC } from "./hooks"; @@ -40,7 +41,8 @@ const BrowserCall = ({ workflowId, workflowRunId, accessToken, initialContextVar start, stop, isStarting, - getAudioInputDevices + getAudioInputDevices, + feedbackMessages, } = useWebSocketRTC({ workflowId, workflowRunId, accessToken, initialContextVariables }); // Poll for recording availability after call ends @@ -93,44 +95,61 @@ const BrowserCall = ({ workflowId, workflowRunId, accessToken, initialContextVar return ( <> - - - Call Voice Agent - +
+ {/* Main content - 2/3 width when panel visible, full width otherwise */} +
+
+ + + Call Voice Agent + - - {isCompleted && checkingForRecording ? ( -
- -
-

Processing your call

-

Fetching transcript and recording...

-
-
- ) : ( - <> - + + {isCompleted && checkingForRecording ? ( +
+ +
+

Processing your call

+

Fetching transcript and recording...

+
+
+ ) : ( + <> + - - - )} -
+ + + )} +
-
+
+
+ + {/* Show transcript panel */} +
+ +
+
{ + // Function call message - centered + if (msg.type === 'function-call') { + return ( +
+
+ {msg.status === 'running' ? ( + + ) : ( + + )} + + {msg.functionName}() + + {msg.status === 'completed' && ( + + )} +
+
+ ); + } + + const isUser = msg.type === 'user-transcription'; + + // User messages on right, bot messages on left + return ( +
+
+
{msg.text}
+ {!msg.final && ( +
+ speaking... +
+ )} +
+
+ ); +}; + +export const RealtimeFeedbackPanel = ({ + messages, + isVisible, + isCallActive, + isCallCompleted +}: RealtimeFeedbackPanelProps) => { + const scrollRef = useRef(null); + + // Auto-scroll to bottom when new messages arrive + useEffect(() => { + if (scrollRef.current) { + scrollRef.current.scrollTop = scrollRef.current.scrollHeight; + } + }, [messages]); + + if (!isVisible) return null; + + return ( +
+ {/* Header */} +
+
+ + Live Transcript +
+ {isCallActive ? ( + <> + + Live + + ) : isCallCompleted ? ( + <> + + Ended + + ) : ( + <> + + Ready + + )} +
+
+
+ + {/* Messages */} +
+ {messages.length === 0 ? ( +
+ +

No messages yet

+

+ {isCallActive + ? "Start speaking to see the transcript" + : "Start the call to begin the conversation" + } +

+
+ ) : ( +
+ {messages.map((msg) => ( + + ))} +
+ )} +
+ + {/* Footer with message count */} + {messages.length > 0 && ( +
+ {messages.filter(m => m.type !== 'function-call').length} messages +
+ )} +
+ ); +}; diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/components/index.ts b/ui/src/app/workflow/[workflowId]/run/[runId]/components/index.ts index 98f3014..6f94ffc 100644 --- a/ui/src/app/workflow/[workflowId]/run/[runId]/components/index.ts +++ b/ui/src/app/workflow/[workflowId]/run/[runId]/components/index.ts @@ -2,4 +2,5 @@ export * from './ApiKeyErrorDialog'; export * from './AudioControls'; export * from './ConnectionStatus'; export * from './ContextDisplay'; +export * from './RealtimeFeedbackPanel'; export * from './WorkflowConfigErrorDialog' diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/hooks/useWebSocketRTC.tsx b/ui/src/app/workflow/[workflowId]/run/[runId]/hooks/useWebSocketRTC.tsx index 8c70517..38c0749 100644 --- a/ui/src/app/workflow/[workflowId]/run/[runId]/hooks/useWebSocketRTC.tsx +++ b/ui/src/app/workflow/[workflowId]/run/[runId]/hooks/useWebSocketRTC.tsx @@ -15,6 +15,16 @@ interface UseWebSocketRTCProps { initialContextVariables?: Record | null; } +export interface FeedbackMessage { + id: string; + type: 'user-transcription' | 'bot-text' | 'function-call'; + text: string; + final?: boolean; + timestamp: string; + functionName?: string; + status?: 'running' | 'completed'; +} + export const useWebSocketRTC = ({ workflowId, workflowRunId, accessToken, initialContextVariables }: UseWebSocketRTCProps) => { const [connectionStatus, setConnectionStatus] = useState<'idle' | 'connecting' | 'connected' | 'failed'>('idle'); const [connectionActive, setConnectionActive] = useState(false); @@ -24,6 +34,7 @@ export const useWebSocketRTC = ({ workflowId, workflowRunId, accessToken, initia const [workflowConfigModalOpen, setWorkflowConfigModalOpen] = useState(false); const [workflowConfigError, setWorkflowConfigError] = useState(null); const [isStarting, setIsStarting] = useState(false); + const [feedbackMessages, setFeedbackMessages] = useState([]); const initialContext = initialContextVariables || {}; const { @@ -271,6 +282,105 @@ export const useWebSocketRTC = ({ workflowId, workflowRunId, accessToken, initia } break; + case 'rtf-user-transcription': { + const transcription = message.payload; + setFeedbackMessages(prev => { + // Mark last bot message as final (user started speaking) + const withBotFinalized = prev.map((m, i) => + i === prev.length - 1 && m.type === 'bot-text' && !m.final + ? { ...m, final: true } + : m + ); + + // For interim transcriptions, replace the last interim + if (!transcription.final) { + const withoutLastInterim = withBotFinalized.filter( + m => !(m.type === 'user-transcription' && !m.final) + ); + return [...withoutLastInterim, { + id: `user-${Date.now()}`, + type: 'user-transcription', + text: transcription.text, + final: false, + timestamp: new Date().toISOString(), + }]; + } + // For final transcriptions, replace interim with final + const withoutInterim = withBotFinalized.filter( + m => !(m.type === 'user-transcription' && !m.final) + ); + return [...withoutInterim, { + id: `user-${Date.now()}`, + type: 'user-transcription', + text: transcription.text, + final: true, + timestamp: new Date().toISOString(), + }]; + }); + break; + } + + case 'rtf-bot-text': { + // TTS text comes as sentences/phrases, concatenate with space + setFeedbackMessages(prev => { + const last = prev[prev.length - 1]; + if (last && last.type === 'bot-text' && !last.final) { + // Append to existing bot message with space if needed + const existingText = last.text; + const newText = message.payload.text; + // Add space between chunks if previous doesn't end with space + // and new doesn't start with space or punctuation + const needsSpace = existingText.length > 0 && + !existingText.endsWith(' ') && + !newText.startsWith(' ') && + !/^[.,!?;:]/.test(newText); + return [ + ...prev.slice(0, -1), + { ...last, text: existingText + (needsSpace ? ' ' : '') + newText } + ]; + } + // Start new bot message + return [...prev, { + id: `bot-${Date.now()}`, + type: 'bot-text', + text: message.payload.text, + final: false, + timestamp: new Date().toISOString(), + }]; + }); + break; + } + + case 'rtf-function-call-start': { + const { function_name, tool_call_id } = message.payload; + setFeedbackMessages(prev => { + // Check if we already have this function call + const existingId = `func-${tool_call_id}`; + if (prev.some(msg => msg.id === existingId)) { + return prev; + } + return [...prev, { + id: existingId, + type: 'function-call', + text: function_name, + functionName: function_name, + status: 'running', + timestamp: new Date().toISOString(), + }]; + }); + break; + } + + case 'rtf-function-call-end': { + const { tool_call_id, result } = message.payload; + setFeedbackMessages(prev => prev.map(msg => + msg.id === `func-${tool_call_id}` + ? { ...msg, status: 'completed' as const, text: result || msg.text } + : msg + )); + break; + } + default: logger.warn('Unknown message type:', message.type); } @@ -505,6 +615,7 @@ export const useWebSocketRTC = ({ workflowId, workflowRunId, accessToken, initia stop, isStarting, initialContext, - getAudioInputDevices + getAudioInputDevices, + feedbackMessages, }; };