diff --git a/api/routes/workflow.py b/api/routes/workflow.py
index 6ce98a1..f8e47eb 100644
--- a/api/routes/workflow.py
+++ b/api/routes/workflow.py
@@ -611,6 +611,7 @@ async def get_workflow_run(
"initial_context": run.initial_context,
"gathered_context": run.gathered_context,
"call_type": run.call_type,
+ "logs": run.logs,
}
diff --git a/api/schemas/workflow.py b/api/schemas/workflow.py
index 5696e8b..f5c338c 100644
--- a/api/schemas/workflow.py
+++ b/api/schemas/workflow.py
@@ -20,3 +20,4 @@ class WorkflowRunResponseSchema(BaseModel):
initial_context: dict | None = None
gathered_context: dict | None = None
call_type: CallType
+ logs: Dict[str, Any] | None = None
diff --git a/api/services/pipecat/event_handlers.py b/api/services/pipecat/event_handlers.py
index 6a654ee..f96c36f 100644
--- a/api/services/pipecat/event_handlers.py
+++ b/api/services/pipecat/event_handlers.py
@@ -4,8 +4,9 @@ from api.db import db_client
from api.enums import WorkflowRunState
from api.services.campaign.call_dispatcher import campaign_call_dispatcher
from api.services.pipecat.audio_config import AudioConfig
-from api.services.pipecat.audio_transcript_buffers import (
+from api.services.pipecat.in_memory_buffers import (
InMemoryAudioBuffer,
+ InMemoryLogsBuffer,
InMemoryTranscriptBuffer,
)
from api.services.pipecat.pipeline_metrics_aggregator import PipelineMetricsAggregator
@@ -80,6 +81,7 @@ def register_task_event_handler(
audio_buffer: AudioBufferProcessor,
in_memory_audio_buffer: InMemoryAudioBuffer,
in_memory_transcript_buffer: InMemoryTranscriptBuffer,
+ in_memory_logs_buffer: InMemoryLogsBuffer,
pipeline_metrics_aggregator: PipelineMetricsAggregator,
):
@task.event_handler("on_pipeline_started")
@@ -185,6 +187,22 @@ def register_task_event_handler(
state=WorkflowRunState.COMPLETED.value,
)
+ # Save real-time feedback logs to workflow run
+ if not in_memory_logs_buffer.is_empty:
+ try:
+ feedback_events = in_memory_logs_buffer.get_events()
+ await db_client.update_workflow_run(
+ run_id=workflow_run_id,
+ logs={"realtime_feedback_events": feedback_events},
+ )
+ logger.debug(
+ f"Saved {len(feedback_events)} feedback events to workflow run logs"
+ )
+ except Exception as e:
+ logger.error(f"Error saving realtime feedback logs: {e}", exc_info=True)
+ else:
+ logger.debug("Logs buffer is empty, skipping save")
+
# Release concurrent slot for campaign calls
if workflow_run and workflow_run.campaign_id:
await campaign_call_dispatcher.release_call_slot(workflow_run_id)
diff --git a/api/services/pipecat/audio_transcript_buffers.py b/api/services/pipecat/in_memory_buffers.py
similarity index 76%
rename from api/services/pipecat/audio_transcript_buffers.py
rename to api/services/pipecat/in_memory_buffers.py
index d087ad8..c4274f4 100644
--- a/api/services/pipecat/audio_transcript_buffers.py
+++ b/api/services/pipecat/in_memory_buffers.py
@@ -2,6 +2,7 @@ import asyncio
import re
import tempfile
import wave
+from datetime import UTC, datetime
from typing import List
from loguru import logger
@@ -120,3 +121,41 @@ class InMemoryTranscriptBuffer:
if self._USER_SPEECH_RE.match(line):
return True
return False
+
+
+class InMemoryLogsBuffer:
+ """Buffer real-time feedback events in memory during a call, then save to workflow run logs."""
+
+ def __init__(self, workflow_run_id: int):
+ self._workflow_run_id = workflow_run_id
+ self._events: List[dict] = []
+ self._turn_counter = 0
+
+ async def append(self, event: dict):
+ """Append a feedback event to the buffer with timestamp."""
+ # Add timestamp and turn tracking
+ timestamped_event = {
+ **event,
+ "timestamp": datetime.now(UTC).isoformat(),
+ "turn": self._turn_counter,
+ }
+ self._events.append(timestamped_event)
+ logger.trace(
+ f"Appended event {event.get('type')} to logs buffer for workflow {self._workflow_run_id}"
+ )
+
+ def increment_turn(self):
+ """Increment turn counter (called on user transcription completion)."""
+ self._turn_counter += 1
+ logger.trace(
+ f"Incremented turn counter to {self._turn_counter} for workflow {self._workflow_run_id}"
+ )
+
+ def get_events(self) -> List[dict]:
+ """Get all events for final storage."""
+ return self._events
+
+ @property
+ def is_empty(self) -> bool:
+ """Check if the buffer is empty."""
+ return len(self._events) == 0
diff --git a/api/services/pipecat/realtime_feedback_observer.py b/api/services/pipecat/realtime_feedback_observer.py
index bf5275d..94ce896 100644
--- a/api/services/pipecat/realtime_feedback_observer.py
+++ b/api/services/pipecat/realtime_feedback_observer.py
@@ -1,19 +1,27 @@
"""Real-time feedback observer for sending pipeline events to the frontend.
This observer watches pipeline frames and sends relevant events (transcriptions,
-bot text) over WebSocket to provide real-time feedback in the UI.
+bot text, function calls, TTFB metrics) over WebSocket to provide real-time
+feedback in the UI.
For frames with presentation timestamps (pts), like TTSTextFrame, we respect
the timing by queuing them and sending at the appropriate time, similar to
how base_output.py handles timed frames.
+
+Note: Node transition events are sent directly from PipecatEngine.set_node()
+rather than being observed here, to ensure precise timing at the moment of
+node changes.
"""
import asyncio
import time
-from typing import Awaitable, Callable, Optional, Set
+from typing import TYPE_CHECKING, Awaitable, Callable, Optional, Set
from loguru import logger
+if TYPE_CHECKING:
+ from api.services.pipecat.in_memory_buffers import InMemoryLogsBuffer
+
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
@@ -21,33 +29,46 @@ from pipecat.frames.frames import (
FunctionCallResultFrame,
InterimTranscriptionFrame,
InterruptionFrame,
+ MetricsFrame,
StopFrame,
TranscriptionFrame,
TTSTextFrame,
)
+from pipecat.metrics.metrics import TTFBMetricsData
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.processors.frame_processor import FrameDirection
from pipecat.utils.time import nanoseconds_to_seconds
class RealtimeFeedbackObserver(BaseObserver):
- """Observer that sends real-time transcription and bot response events via WebSocket.
+ """Observer that sends real-time transcription, bot response, and metrics via WebSocket.
+
+ Observes pipeline frames and sends events for:
+ - User transcriptions (interim and final)
+ - Bot TTS text (with pts-based timing)
+ - Function calls (start/end)
+ - TTFB metrics (LLM generation time only - filters to processors containing "LLM")
For frames with pts (presentation timestamp), we queue them and send at the
appropriate time to sync with audio playback.
+
+ Note: Node transitions are handled by PipecatEngine.set_node() callback.
"""
def __init__(
self,
ws_sender: Callable[[dict], Awaitable[None]],
+ logs_buffer: Optional["InMemoryLogsBuffer"] = None,
):
"""
Args:
ws_sender: Async function to send messages over WebSocket.
Expected signature: async def send(message: dict) -> None
+ logs_buffer: Optional InMemoryLogsBuffer to persist events for post-call analysis.
"""
super().__init__()
self._ws_sender = ws_sender
+ self._logs_buffer = logs_buffer
self._frames_seen: Set[str] = set()
# Clock/timing for pts-based frames (similar to base_output.py)
@@ -126,6 +147,8 @@ class RealtimeFeedbackObserver(BaseObserver):
frame = data.frame
frame_direction = data.direction
+ logger.trace(f"{self} Received Frame: {frame} Direction: {frame_direction}")
+
# Handle pipeline termination - stop clock task
if isinstance(frame, (EndFrame, CancelFrame, StopFrame)):
await self._cancel_clock_task()
@@ -167,6 +190,9 @@ class RealtimeFeedbackObserver(BaseObserver):
},
}
)
+ # Increment turn counter on final user transcription
+ if self._logs_buffer:
+ self._logs_buffer.increment_turn()
# Handle bot TTS text - respect pts timing
elif isinstance(frame, TTSTextFrame):
message = {
@@ -217,11 +243,36 @@ class RealtimeFeedbackObserver(BaseObserver):
},
}
)
+ # Handle TTFB metrics - capture LLM generation time only
+ elif isinstance(frame, MetricsFrame):
+ # Check if this MetricsFrame contains TTFB data from an LLM processor
+ for metric_data in frame.data:
+ if isinstance(metric_data, TTFBMetricsData):
+ # Only send TTFB if it's from an LLM processor
+ if metric_data.processor and "LLM" in metric_data.processor:
+ await self._send_message(
+ {
+ "type": "rtf-ttfb-metric",
+ "payload": {
+ "ttfb_seconds": metric_data.value,
+ "processor": metric_data.processor,
+ "model": metric_data.model,
+ },
+ }
+ )
async def _send_message(self, message: dict):
- """Send message via WebSocket, handling errors gracefully."""
+ """Send message via WebSocket AND append to logs buffer, handling errors gracefully."""
+ # Send via WebSocket
try:
await self._ws_sender(message)
except Exception as e:
# Log but don't fail - feedback is non-critical
logger.debug(f"Failed to send real-time feedback message: {e}")
+
+ # Also append to logs buffer
+ if self._logs_buffer:
+ try:
+ await self._logs_buffer.append(message)
+ except Exception as e:
+ logger.error(f"Failed to append to logs buffer: {e}")
diff --git a/api/services/pipecat/run_pipeline.py b/api/services/pipecat/run_pipeline.py
index b385a2c..f056554 100644
--- a/api/services/pipecat/run_pipeline.py
+++ b/api/services/pipecat/run_pipeline.py
@@ -14,6 +14,7 @@ from api.services.pipecat.event_handlers import (
register_transcript_handler,
register_transport_event_handlers,
)
+from api.services.pipecat.in_memory_buffers import InMemoryLogsBuffer
from api.services.pipecat.pipeline_builder import (
build_pipeline,
create_pipeline_components,
@@ -467,11 +468,45 @@ async def _run_pipeline(
ReactFlowDTO.model_validate(workflow.workflow_definition_with_fallback)
)
+ # Create in-memory logs buffer early so it can be used by engine callbacks
+ in_memory_logs_buffer = InMemoryLogsBuffer(workflow_run_id)
+
+ # Create node transition callback if WebSocket sender is available
+ node_transition_callback = None
+ ws_sender = get_ws_sender(workflow_run_id)
+ if ws_sender:
+
+ async def send_node_transition(
+ node_name: str, previous_node: Optional[str]
+ ) -> None:
+ """Send node transition event via WebSocket AND log to buffer."""
+ message = {
+ "type": "rtf-node-transition",
+ "payload": {
+ "node_name": node_name,
+ "previous_node": previous_node,
+ },
+ }
+ # Send via WebSocket
+ try:
+ await ws_sender(message)
+ except Exception as e:
+ logger.debug(f"Failed to send node transition via WebSocket: {e}")
+
+ # Log to in-memory buffer
+ try:
+ await in_memory_logs_buffer.append(message)
+ except Exception as e:
+ logger.error(f"Failed to append node transition to logs buffer: {e}")
+
+ node_transition_callback = send_node_transition
+
engine = PipecatEngine(
llm=llm,
workflow=workflow_graph,
call_context_vars=merged_call_context_vars,
workflow_run_id=workflow_run_id,
+ node_transition_callback=node_transition_callback,
)
# Create pipeline components with audio configuration and engine
@@ -566,12 +601,6 @@ async def _run_pipeline(
# Create pipeline task with audio configuration
task = create_pipeline_task(pipeline, workflow_run_id, audio_config)
- # Add real-time feedback observer if WebSocket sender is available
- ws_sender = get_ws_sender(workflow_run_id)
- if ws_sender:
- feedback_observer = RealtimeFeedbackObserver(ws_sender=ws_sender)
- task.add_observer(feedback_observer)
-
# Now set the task on the engine
engine.set_task(task)
@@ -590,6 +619,15 @@ async def _run_pipeline(
)
)
+ # Add real-time feedback observer if WebSocket sender is available
+ # Note: ws_sender was already fetched earlier for node_transition_callback
+ if ws_sender:
+ feedback_observer = RealtimeFeedbackObserver(
+ ws_sender=ws_sender,
+ logs_buffer=in_memory_logs_buffer,
+ )
+ task.add_observer(feedback_observer)
+
register_task_event_handler(
workflow_run_id,
engine,
@@ -598,6 +636,7 @@ async def _run_pipeline(
audio_buffer,
in_memory_audio_buffer,
in_memory_transcript_buffer,
+ in_memory_logs_buffer,
pipeline_metrics_aggregator,
)
diff --git a/api/services/workflow/pipecat_engine.py b/api/services/workflow/pipecat_engine.py
index b8bd59f..6ebc2ca 100644
--- a/api/services/workflow/pipecat_engine.py
+++ b/api/services/workflow/pipecat_engine.py
@@ -62,6 +62,9 @@ class PipecatEngine:
call_context_vars: dict,
audio_buffer: Optional["AudioBuffer"] = None,
workflow_run_id: Optional[int] = None,
+ node_transition_callback: Optional[
+ Callable[[str, Optional[str]], Awaitable[None]]
+ ] = None,
):
self.task = task
self.llm = llm
@@ -71,6 +74,7 @@ class PipecatEngine:
self._call_context_vars = call_context_vars
self._audio_buffer = audio_buffer
self._workflow_run_id = workflow_run_id
+ self._node_transition_callback = node_transition_callback
self._initialized = False
self._client_disconnected = False
self._call_disposed = False
@@ -359,9 +363,20 @@ class PipecatEngine:
f"Executing node: name: {node.name} is_static: {node.is_static} allow_interrupt: {node.allow_interrupt} is_end: {node.is_end}"
)
+ # Track previous node for transition event
+ previous_node_name = self._current_node.name if self._current_node else None
+
# Set current node for all nodes (including static ones) so STT mute filter works
self._current_node = node
+ # Send node transition event if callback is provided
+ if self._node_transition_callback:
+ try:
+ await self._node_transition_callback(node.name, previous_node_name)
+ except Exception as e:
+ # Log but don't fail - feedback is non-critical
+ logger.debug(f"Failed to send node transition event: {e}")
+
# Handle start nodes
if node.is_start:
await self._handle_start_node(node)
@@ -693,5 +708,3 @@ class PipecatEngine:
and not self._user_response_timeout_task.done()
):
self._user_response_timeout_task.cancel()
-
- # Note: Native VoicemailDetector cleanup is handled by the pipeline
diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/BrowserCall.tsx b/ui/src/app/workflow/[workflowId]/run/[runId]/BrowserCall.tsx
index 4bfd490..8bd9d20 100644
--- a/ui/src/app/workflow/[workflowId]/run/[runId]/BrowserCall.tsx
+++ b/ui/src/app/workflow/[workflowId]/run/[runId]/BrowserCall.tsx
@@ -9,7 +9,7 @@ import {
ApiKeyErrorDialog,
AudioControls,
ConnectionStatus,
- RealtimeFeedbackPanel,
+ RealtimeFeedback,
WorkflowConfigErrorDialog
} from "./components";
import { useWebSocketRTC } from "./hooks";
@@ -142,9 +142,9 @@ const BrowserCall = ({ workflowId, workflowRunId, accessToken, initialContextVar
{/* Show transcript panel */}
-
diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/components/RealtimeFeedback.tsx b/ui/src/app/workflow/[workflowId]/run/[runId]/components/RealtimeFeedback.tsx
new file mode 100644
index 0000000..5cc5e67
--- /dev/null
+++ b/ui/src/app/workflow/[workflowId]/run/[runId]/components/RealtimeFeedback.tsx
@@ -0,0 +1,161 @@
+'use client';
+
+import { FeedbackMessage } from '../hooks/useWebSocketRTC';
+import { processLiveMessages, processTranscriptEvents, TranscriptEvent } from '../utils/processTranscriptEvents';
+import { UnifiedTranscript } from './UnifiedTranscript';
+
+// Historical log event format from the backend
+interface RealtimeFeedbackEvent {
+ type: string;
+ payload: {
+ text?: string;
+ final?: boolean;
+ user_id?: string;
+ timestamp?: string;
+ function_name?: string;
+ tool_call_id?: string;
+ result?: string;
+ node_name?: string;
+ previous_node?: string;
+ ttfb_seconds?: number;
+ processor?: string;
+ model?: string;
+ };
+ timestamp: string;
+ turn: number;
+}
+
+export interface WorkflowRunLogs {
+ realtime_feedback_events?: RealtimeFeedbackEvent[];
+}
+
+// Props for live mode (WebSocket messages)
+interface LiveModeProps {
+ mode: 'live';
+ messages: FeedbackMessage[];
+ isCallActive: boolean;
+ isCallCompleted: boolean;
+}
+
+// Props for historical mode (API logs)
+interface HistoricalModeProps {
+ mode: 'historical';
+ logs: WorkflowRunLogs | null;
+}
+
+type RealtimeFeedbackProps = LiveModeProps | HistoricalModeProps;
+
+/**
+ * Convert backend log events to unified TranscriptEvent format
+ */
+function convertLogEventsToTranscriptEvents(events: RealtimeFeedbackEvent[]): TranscriptEvent[] {
+ return events.map(event => {
+ let type: TranscriptEvent['type'];
+ let status: TranscriptEvent['status'];
+
+ switch (event.type) {
+ case 'rtf-user-transcription':
+ type = 'user-transcription';
+ break;
+ case 'rtf-bot-text':
+ type = 'bot-text';
+ break;
+ case 'rtf-function-call-start':
+ type = 'function-call';
+ status = 'running';
+ break;
+ case 'rtf-function-call-end':
+ type = 'function-call';
+ status = 'completed';
+ break;
+ case 'rtf-node-transition':
+ type = 'node-transition';
+ break;
+ case 'rtf-ttfb-metric':
+ type = 'ttfb-metric';
+ break;
+ default:
+ type = 'bot-text';
+ }
+
+ return {
+ type,
+ text: event.payload.text || event.payload.result || event.payload.function_name || event.payload.node_name || '',
+ final: event.payload.final,
+ timestamp: event.timestamp,
+ turn: event.turn,
+ functionName: event.payload.function_name,
+ status,
+ nodeName: event.payload.node_name,
+ previousNode: event.payload.previous_node,
+ ttfbSeconds: event.payload.ttfb_seconds,
+ processor: event.payload.processor,
+ model: event.payload.model,
+ };
+ });
+}
+
+/**
+ * Convert live WebSocket messages to unified TranscriptEvent format
+ */
+function convertLiveMessagesToTranscriptEvents(messages: FeedbackMessage[]): TranscriptEvent[] {
+ return messages.map(msg => ({
+ type: msg.type,
+ text: msg.text,
+ final: msg.final,
+ timestamp: msg.timestamp,
+ functionName: msg.functionName,
+ status: msg.status,
+ nodeName: msg.nodeName,
+ previousNode: msg.previousNode,
+ ttfbSeconds: msg.ttfbSeconds,
+ processor: msg.processor,
+ model: msg.model,
+ }));
+}
+
+/**
+ * Single unified component that handles both live WebSocket messages
+ * and historical logs from the API.
+ */
+export const RealtimeFeedback = (props: RealtimeFeedbackProps) => {
+ if (props.mode === 'historical') {
+ // Historical mode - process logs from API
+ const rawEvents = props.logs?.realtime_feedback_events;
+ const messages = rawEvents
+ ? processTranscriptEvents(convertLogEventsToTranscriptEvents(rawEvents))
+ : [];
+
+ return (
+
+ );
+ }
+
+ // Live mode - process WebSocket messages (optimized - messages already accumulated)
+ const { messages, isCallActive, isCallCompleted } = props;
+ const status = isCallActive ? 'live' : isCallCompleted ? 'ended' : 'ready';
+ const processedMessages = processLiveMessages(convertLiveMessagesToTranscriptEvents(messages));
+
+ return (
+
+ );
+};
diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/components/RealtimeFeedbackPanel.tsx b/ui/src/app/workflow/[workflowId]/run/[runId]/components/RealtimeFeedbackPanel.tsx
deleted file mode 100644
index 5409599..0000000
--- a/ui/src/app/workflow/[workflowId]/run/[runId]/components/RealtimeFeedbackPanel.tsx
+++ /dev/null
@@ -1,152 +0,0 @@
-"use client";
-
-import { Loader2, MessageSquare, Mic, MicOff, Wrench } from "lucide-react";
-import { useEffect, useRef } from "react";
-
-import { cn } from "@/lib/utils";
-
-import { FeedbackMessage } from "../hooks/useWebSocketRTC";
-
-interface RealtimeFeedbackPanelProps {
- messages: FeedbackMessage[];
- isVisible: boolean;
- isCallActive: boolean;
- isCallCompleted: boolean;
-}
-
-const MessageItem = ({ msg }: { msg: FeedbackMessage }) => {
- // Function call message - centered
- if (msg.type === 'function-call') {
- return (
-
-
- {msg.status === 'running' ? (
-
- ) : (
-
- )}
-
- {msg.functionName}()
-
- {msg.status === 'completed' && (
- ✓
- )}
-
-
- );
- }
-
- const isUser = msg.type === 'user-transcription';
-
- // User messages on right, bot messages on left
- return (
-
-
-
{msg.text}
- {!msg.final && (
-
- speaking...
-
- )}
-
-
- );
-};
-
-export const RealtimeFeedbackPanel = ({
- messages,
- isVisible,
- isCallActive,
- isCallCompleted
-}: RealtimeFeedbackPanelProps) => {
- const scrollRef = useRef
(null);
-
- // Auto-scroll to bottom when new messages arrive
- useEffect(() => {
- if (scrollRef.current) {
- scrollRef.current.scrollTop = scrollRef.current.scrollHeight;
- }
- }, [messages]);
-
- if (!isVisible) return null;
-
- return (
-
- {/* Header */}
-
-
-
-
Live Transcript
-
- {isCallActive ? (
- <>
-
- Live
- >
- ) : isCallCompleted ? (
- <>
-
- Ended
- >
- ) : (
- <>
-
- Ready
- >
- )}
-
-
-
-
- {/* Messages */}
-
- {messages.length === 0 ? (
-
-
-
No messages yet
-
- {isCallActive
- ? "Start speaking to see the transcript"
- : "Start the call to begin the conversation"
- }
-
-
- ) : (
-
- {messages.map((msg) => (
-
- ))}
-
- )}
-
-
- {/* Footer with message count */}
- {messages.length > 0 && (
-
- {messages.filter(m => m.type !== 'function-call').length} messages
-
- )}
-
- );
-};
diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/components/UnifiedTranscript.tsx b/ui/src/app/workflow/[workflowId]/run/[runId]/components/UnifiedTranscript.tsx
new file mode 100644
index 0000000..635cb07
--- /dev/null
+++ b/ui/src/app/workflow/[workflowId]/run/[runId]/components/UnifiedTranscript.tsx
@@ -0,0 +1,96 @@
+"use client";
+
+import { useEffect, useRef } from "react";
+
+import { ProcessedMessage } from "../utils/processTranscriptEvents";
+import { TranscriptContainer } from "./shared/TranscriptContainer";
+import { TranscriptEmptyState } from "./shared/TranscriptEmptyState";
+import { TranscriptMessage, TranscriptMessageData } from "./shared/TranscriptMessage";
+
+interface UnifiedTranscriptProps {
+ messages: ProcessedMessage[];
+ status: 'ready' | 'live' | 'ended';
+ title?: string;
+ autoScroll?: boolean;
+ emptyState?: {
+ title: string;
+ subtitle: string;
+ };
+}
+
+export const UnifiedTranscript = ({
+ messages,
+ status,
+ title,
+ autoScroll = false,
+ emptyState
+}: UnifiedTranscriptProps) => {
+ const scrollRef = useRef(null);
+
+ // Auto-scroll to bottom when new messages arrive (for live mode)
+ useEffect(() => {
+ if (autoScroll && scrollRef.current) {
+ scrollRef.current.scrollTop = scrollRef.current.scrollHeight;
+ }
+ }, [messages, autoScroll]);
+
+ // Calculate message count (exclude system messages like function calls, node transitions, TTFB)
+ const messageCount = messages.filter(
+ m => m.type === 'user-transcription' || m.type === 'bot-text'
+ ).length;
+
+ // Convert ProcessedMessage to TranscriptMessageData
+ const transcriptMessages: TranscriptMessageData[] = messages.map(msg => ({
+ id: msg.id,
+ type: msg.type,
+ text: msg.text,
+ final: msg.final,
+ functionName: msg.functionName,
+ status: msg.status,
+ nodeName: msg.nodeName,
+ ttfbSeconds: msg.ttfbSeconds,
+ }));
+
+ // Default empty state
+ const defaultEmptyState = {
+ title: status === 'live' ? "No messages yet" : "No conversation recorded",
+ subtitle: status === 'live'
+ ? "Start speaking to see the transcript"
+ : "Real-time feedback events were not captured"
+ };
+
+ const emptyStateToShow = emptyState || defaultEmptyState;
+
+ return (
+ 0 ? messageCount : undefined}
+ >
+
+ {messages.length === 0 ? (
+
+ ) : (
+
+ {transcriptMessages.map((msg, index) => {
+ // Skip standalone TTFB metrics (they're rendered inline with bot text)
+ if (msg.type === 'ttfb-metric') {
+ return null;
+ }
+ return (
+
+ );
+ })}
+
+ )}
+
+
+ );
+};
diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/components/index.ts b/ui/src/app/workflow/[workflowId]/run/[runId]/components/index.ts
index 6f94ffc..dedd126 100644
--- a/ui/src/app/workflow/[workflowId]/run/[runId]/components/index.ts
+++ b/ui/src/app/workflow/[workflowId]/run/[runId]/components/index.ts
@@ -2,5 +2,5 @@ export * from './ApiKeyErrorDialog';
export * from './AudioControls';
export * from './ConnectionStatus';
export * from './ContextDisplay';
-export * from './RealtimeFeedbackPanel';
-export * from './WorkflowConfigErrorDialog'
+export * from './RealtimeFeedback';
+export * from './WorkflowConfigErrorDialog';
diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/components/shared/TranscriptContainer.tsx b/ui/src/app/workflow/[workflowId]/run/[runId]/components/shared/TranscriptContainer.tsx
new file mode 100644
index 0000000..f62b752
--- /dev/null
+++ b/ui/src/app/workflow/[workflowId]/run/[runId]/components/shared/TranscriptContainer.tsx
@@ -0,0 +1,72 @@
+'use client';
+
+import { MessageSquare, Mic, MicOff } from 'lucide-react';
+import { ReactNode } from 'react';
+
+import { cn } from '@/lib/utils';
+
+type CallStatus = 'ready' | 'live' | 'ended';
+
+interface TranscriptContainerProps {
+ title: string;
+ status: CallStatus;
+ children: ReactNode;
+ messageCount?: number;
+}
+
+const STATUS_CONFIG = {
+ ready: {
+ icon: MicOff,
+ label: 'Ready',
+ className: 'bg-muted text-muted-foreground',
+ },
+ live: {
+ icon: Mic,
+ label: 'Live',
+ className: 'bg-green-500/10 text-green-600 dark:text-green-400',
+ },
+ ended: {
+ icon: MicOff,
+ label: 'Ended',
+ className: 'bg-muted text-muted-foreground',
+ },
+};
+
+export function TranscriptContainer({
+ title,
+ status,
+ children,
+ messageCount
+}: TranscriptContainerProps) {
+ const statusConfig = STATUS_CONFIG[status];
+ const StatusIcon = statusConfig.icon;
+
+ return (
+
+ {/* Header */}
+
+
+
+
{title}
+
+
+ {statusConfig.label}
+
+
+
+
+ {/* Content */}
+ {children}
+
+ {/* Footer with message count */}
+ {messageCount !== undefined && messageCount > 0 && (
+
+ {messageCount} messages
+
+ )}
+
+ );
+}
diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/components/shared/TranscriptEmptyState.tsx b/ui/src/app/workflow/[workflowId]/run/[runId]/components/shared/TranscriptEmptyState.tsx
new file mode 100644
index 0000000..740e431
--- /dev/null
+++ b/ui/src/app/workflow/[workflowId]/run/[runId]/components/shared/TranscriptEmptyState.tsx
@@ -0,0 +1,20 @@
+'use client';
+
+import { MessageSquare } from 'lucide-react';
+
+interface TranscriptEmptyStateProps {
+ title: string;
+ subtitle: string;
+}
+
+export function TranscriptEmptyState({ title, subtitle }: TranscriptEmptyStateProps) {
+ return (
+
+
+
{title}
+
+ {subtitle}
+
+
+ );
+}
diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/components/shared/TranscriptMessage.tsx b/ui/src/app/workflow/[workflowId]/run/[runId]/components/shared/TranscriptMessage.tsx
new file mode 100644
index 0000000..c41d4ee
--- /dev/null
+++ b/ui/src/app/workflow/[workflowId]/run/[runId]/components/shared/TranscriptMessage.tsx
@@ -0,0 +1,110 @@
+'use client';
+
+import { Brain, GitBranch, Wrench } from 'lucide-react';
+
+import { cn } from '@/lib/utils';
+
+export interface TranscriptMessageData {
+ id: string;
+ type: 'user-transcription' | 'bot-text' | 'function-call' | 'node-transition' | 'ttfb-metric';
+ text: string;
+ final?: boolean;
+ functionName?: string;
+ nodeName?: string;
+ ttfbSeconds?: number;
+}
+
+interface TranscriptMessageProps {
+ message: TranscriptMessageData;
+ nextMessage?: TranscriptMessageData;
+}
+
+export function TranscriptMessage({ message, nextMessage }: TranscriptMessageProps) {
+ // Node transition - show as section divider
+ if (message.type === 'node-transition') {
+ return (
+
+
+
+
+
+ {message.nodeName}
+
+
+
+
+ );
+ }
+
+ // TTFB metric - don't render standalone, it'll be shown with bot messages and function calls
+ if (message.type === 'ttfb-metric') {
+ return null;
+ }
+
+ // Function call message - centered with TTFB if present
+ if (message.type === 'function-call') {
+ const ttfbMetric = nextMessage?.type === 'ttfb-metric' ? nextMessage : null;
+ return (
+
+ {/* Show TTFB metric above function call */}
+ {ttfbMetric && ttfbMetric.ttfbSeconds !== undefined && (
+
+
+ Reasoning Delay:
+ {(ttfbMetric.ttfbSeconds * 1000).toFixed(0)}ms
+
+ )}
+
+
+
+ {message.functionName}()
+
+
+
+ );
+ }
+
+ const isUser = message.type === 'user-transcription';
+ const isBot = message.type === 'bot-text';
+
+ // Check if next message is a TTFB metric (for bot messages)
+ const ttfbMetric = isBot && nextMessage?.type === 'ttfb-metric' ? nextMessage : null;
+
+ // User messages on right, bot messages on left
+ return (
+
+
+ {/* Show TTFB metric above bot messages */}
+ {ttfbMetric && ttfbMetric.ttfbSeconds !== undefined && (
+
+
+ Reasoning Delay:
+ {(ttfbMetric.ttfbSeconds * 1000).toFixed(0)}ms
+
+ )}
+
+
{message.text}
+ {!message.final && (
+
+ speaking...
+
+ )}
+
+
+
+ );
+}
diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/hooks/useWebSocketRTC.tsx b/ui/src/app/workflow/[workflowId]/run/[runId]/hooks/useWebSocketRTC.tsx
index 38c0749..e923cd3 100644
--- a/ui/src/app/workflow/[workflowId]/run/[runId]/hooks/useWebSocketRTC.tsx
+++ b/ui/src/app/workflow/[workflowId]/run/[runId]/hooks/useWebSocketRTC.tsx
@@ -17,12 +17,19 @@ interface UseWebSocketRTCProps {
export interface FeedbackMessage {
id: string;
- type: 'user-transcription' | 'bot-text' | 'function-call';
+ type: 'user-transcription' | 'bot-text' | 'function-call' | 'node-transition' | 'ttfb-metric';
text: string;
final?: boolean;
timestamp: string;
functionName?: string;
status?: 'running' | 'completed';
+ // Node transition fields
+ nodeName?: string;
+ previousNode?: string;
+ // TTFB metric fields
+ ttfbSeconds?: number;
+ processor?: string;
+ model?: string;
}
export const useWebSocketRTC = ({ workflowId, workflowRunId, accessToken, initialContextVariables }: UseWebSocketRTCProps) => {
@@ -285,35 +292,26 @@ export const useWebSocketRTC = ({ workflowId, workflowRunId, accessToken, initia
case 'rtf-user-transcription': {
const transcription = message.payload;
setFeedbackMessages(prev => {
- // Mark last bot message as final (user started speaking)
- const withBotFinalized = prev.map((m, i) =>
- i === prev.length - 1 && m.type === 'bot-text' && !m.final
- ? { ...m, final: true }
- : m
+ // Step 1: Finalize the last bot message (user started speaking)
+ const messagesWithBotFinalized = prev.map((msg, idx) => {
+ const isLastMessage = idx === prev.length - 1;
+ const isUnfinalizedBotMessage = msg.type === 'bot-text' && !msg.final;
+ return isLastMessage && isUnfinalizedBotMessage
+ ? { ...msg, final: true }
+ : msg;
+ });
+
+ // Step 2: Remove any previous interim transcription
+ const messagesWithoutInterim = messagesWithBotFinalized.filter(
+ msg => !(msg.type === 'user-transcription' && !msg.final)
);
- // For interim transcriptions, replace the last interim
- if (!transcription.final) {
- const withoutLastInterim = withBotFinalized.filter(
- m => !(m.type === 'user-transcription' && !m.final)
- );
- return [...withoutLastInterim, {
- id: `user-${Date.now()}`,
- type: 'user-transcription',
- text: transcription.text,
- final: false,
- timestamp: new Date().toISOString(),
- }];
- }
- // For final transcriptions, replace interim with final
- const withoutInterim = withBotFinalized.filter(
- m => !(m.type === 'user-transcription' && !m.final)
- );
- return [...withoutInterim, {
+ // Step 3: Add new transcription (interim or final)
+ return [...messagesWithoutInterim, {
id: `user-${Date.now()}`,
type: 'user-transcription',
text: transcription.text,
- final: true,
+ final: transcription.final,
timestamp: new Date().toISOString(),
}];
});
@@ -381,6 +379,33 @@ export const useWebSocketRTC = ({ workflowId, workflowRunId, accessToken, initia
break;
}
+ case 'rtf-node-transition': {
+ const { node_name, previous_node } = message.payload;
+ setFeedbackMessages(prev => [...prev, {
+ id: `node-${Date.now()}`,
+ type: 'node-transition',
+ text: node_name,
+ nodeName: node_name,
+ previousNode: previous_node,
+ timestamp: new Date().toISOString(),
+ }]);
+ break;
+ }
+
+ case 'rtf-ttfb-metric': {
+ const { ttfb_seconds, processor, model } = message.payload;
+ setFeedbackMessages(prev => [...prev, {
+ id: `ttfb-${Date.now()}`,
+ type: 'ttfb-metric',
+ text: `${(ttfb_seconds * 1000).toFixed(0)}ms`,
+ ttfbSeconds: ttfb_seconds,
+ processor,
+ model,
+ timestamp: new Date().toISOString(),
+ }]);
+ break;
+ }
+
default:
logger.warn('Unknown message type:', message.type);
}
diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/page.tsx b/ui/src/app/workflow/[workflowId]/run/[runId]/page.tsx
index 0c055c4..27b82f6 100644
--- a/ui/src/app/workflow/[workflowId]/run/[runId]/page.tsx
+++ b/ui/src/app/workflow/[workflowId]/run/[runId]/page.tsx
@@ -6,6 +6,7 @@ import { useParams } from 'next/navigation';
import { useEffect, useRef, useState } from 'react';
import BrowserCall from '@/app/workflow/[workflowId]/run/[runId]/BrowserCall';
+import { RealtimeFeedback, WorkflowRunLogs } from '@/app/workflow/[workflowId]/run/[runId]/components/RealtimeFeedback';
import WorkflowLayout from '@/app/workflow/WorkflowLayout';
import { getWorkflowRunApiV1WorkflowWorkflowIdRunsRunIdGet } from '@/client/sdk.gen';
import { MediaPreviewButtons, MediaPreviewDialog } from '@/components/MediaPreviewDialog';
@@ -23,6 +24,7 @@ interface WorkflowRunResponse {
recording_url: string | null;
initial_context: Record | null;
gathered_context: Record | null;
+ logs: WorkflowRunLogs | null;
}
function ContextDisplay({ title, context }: { title: string; context: Record | null }) {
@@ -116,6 +118,7 @@ export default function WorkflowRunPage() {
recording_url: response.data?.recording_url ?? null,
initial_context: response.data?.initial_context as Record | null ?? null,
gathered_context: response.data?.gathered_context as Record | null ?? null,
+ logs: response.data?.logs as WorkflowRunLogs | null ?? null,
});
};
fetchWorkflowRun();
@@ -147,8 +150,10 @@ export default function WorkflowRunPage() {
}
else if (workflowRun?.is_completed) {
returnValue = (
-
-
+
+ {/* Main content - 2/3 width */}
+
+
@@ -235,17 +240,23 @@ export default function WorkflowRunPage() {
-
+
+ {/* Transcript panel - 1/3 width */}
+
+
+
);
}
diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/utils/processTranscriptEvents.ts b/ui/src/app/workflow/[workflowId]/run/[runId]/utils/processTranscriptEvents.ts
new file mode 100644
index 0000000..25a6f36
--- /dev/null
+++ b/ui/src/app/workflow/[workflowId]/run/[runId]/utils/processTranscriptEvents.ts
@@ -0,0 +1,147 @@
+/**
+ * Utility to process realtime feedback events into a unified transcript format.
+ * Used by both live WebSocket messages and post-call logs.
+ */
+
+export interface TranscriptEvent {
+ type: 'user-transcription' | 'bot-text' | 'function-call' | 'node-transition' | 'ttfb-metric';
+ text: string;
+ final?: boolean;
+ timestamp: string;
+ turn?: number;
+ functionName?: string;
+ status?: 'running' | 'completed';
+ nodeName?: string;
+ previousNode?: string;
+ ttfbSeconds?: number;
+ processor?: string;
+ model?: string;
+}
+
+export interface ProcessedMessage {
+ id: string;
+ type: TranscriptEvent['type'];
+ text: string;
+ final?: boolean;
+ timestamp: string;
+ functionName?: string;
+ status?: 'running' | 'completed';
+ nodeName?: string;
+ ttfbSeconds?: number;
+}
+
+/**
+ * Process transcript events (both live and historical).
+ * Combines consecutive bot-text by turn and associates TTFB metrics.
+ */
+export function processTranscriptEvents(events: TranscriptEvent[]): ProcessedMessage[] {
+ // Filter out interim transcriptions and function-call-start events
+ const filteredEvents = events.filter(event => {
+ if (event.type === 'user-transcription' && !event.final) return false;
+ if (event.type === 'function-call' && event.status === 'running') return false;
+ return true;
+ });
+
+ const processed: ProcessedMessage[] = [];
+ let currentBotText: { event: TranscriptEvent; text: string } | null = null;
+ let pendingTtfb: TranscriptEvent | null = null;
+
+ const flushBotText = () => {
+ if (!currentBotText) return;
+
+ processed.push(convertToProcessedMessage(currentBotText.event, currentBotText.text));
+
+ // Add the pending TTFB metric if it exists
+ if (pendingTtfb) {
+ processed.push(convertToProcessedMessage(pendingTtfb));
+ pendingTtfb = null;
+ }
+
+ currentBotText = null;
+ };
+
+ for (const event of filteredEvents) {
+ if (event.type === 'ttfb-metric') {
+ // Store TTFB to associate with the next bot-text or function-call
+ pendingTtfb = event;
+ } else if (event.type === 'bot-text') {
+ // Combine consecutive bot-text from the same turn
+ if (currentBotText && currentBotText.event.turn === event.turn) {
+ currentBotText.text = currentBotText.text + ' ' + event.text;
+ } else {
+ flushBotText();
+ currentBotText = { event, text: event.text };
+ }
+ } else {
+ // Handle other events (user-transcription, function-call, node-transition)
+ flushBotText();
+ processed.push(convertToProcessedMessage(event));
+
+ // Add pending TTFB after function calls
+ if (event.type === 'function-call' && pendingTtfb) {
+ processed.push(convertToProcessedMessage(pendingTtfb));
+ pendingTtfb = null;
+ }
+ }
+ }
+
+ // Flush any remaining bot text
+ flushBotText();
+
+ return processed;
+}
+
+/**
+ * Process live messages - optimized version.
+ *
+ * Optimizations rely on useWebSocketRTC.tsx already handling:
+ * - Bot text accumulation (consecutive chunks combined with spaces)
+ * - Interim transcription filtering (only final transcriptions kept)
+ * - Function call status (start events filtered, only completed kept)
+ *
+ * This function only needs to:
+ * - Associate TTFB metrics with the preceding bot-text or function-call
+ * - Convert to ProcessedMessage format
+ */
+export function processLiveMessages(messages: TranscriptEvent[]): ProcessedMessage[] {
+ const processed: ProcessedMessage[] = [];
+ let pendingTtfb: TranscriptEvent | null = null;
+
+ for (const msg of messages) {
+ if (msg.type === 'ttfb-metric') {
+ // Store TTFB to associate with next message
+ pendingTtfb = msg;
+ } else {
+ // Add the message
+ processed.push(convertToProcessedMessage(msg));
+
+ // Add pending TTFB after final bot-text or completed function calls
+ if ((msg.type === 'bot-text' && msg.final) ||
+ (msg.type === 'function-call' && msg.status === 'completed')) {
+ if (pendingTtfb) {
+ processed.push(convertToProcessedMessage(pendingTtfb));
+ pendingTtfb = null;
+ }
+ }
+ }
+ }
+
+ return processed;
+}
+
+// Alias for backward compatibility
+export const processHistoricalEvents = processTranscriptEvents;
+
+function convertToProcessedMessage(event: TranscriptEvent, overrideText?: string): ProcessedMessage {
+ return {
+ id: `${event.type}-${event.timestamp}`,
+ type: event.type,
+ text: overrideText ?? event.text,
+ final: event.final ?? true,
+ timestamp: event.timestamp,
+ functionName: event.functionName,
+ status: event.status,
+ nodeName: event.nodeName,
+ ttfbSeconds: event.ttfbSeconds,
+ };
+}
diff --git a/ui/src/client/types.gen.ts b/ui/src/client/types.gen.ts
index 67cd7dd..aee4bea 100644
--- a/ui/src/client/types.gen.ts
+++ b/ui/src/client/types.gen.ts
@@ -964,6 +964,9 @@ export type WorkflowRunResponseSchema = {
[key: string]: unknown;
} | null;
call_type: CallType;
+ logs?: {
+ [key: string]: unknown;
+ } | null;
};
export type WorkflowRunUsageResponse = {