chore: unify the call logs and real time events

This commit is contained in:
Abhishek Kumar 2026-01-15 16:16:32 +05:30
parent d25f898a8f
commit 58f0bbe184
16 changed files with 753 additions and 359 deletions

View file

@ -47,7 +47,6 @@ def register_transport_event_handlers(
num_channels=num_channels,
)
in_memory_transcript_buffer = InMemoryTranscriptBuffer(workflow_run_id)
in_memory_logs_buffer = InMemoryLogsBuffer(workflow_run_id)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, participant):
@ -71,7 +70,7 @@ def register_transport_event_handlers(
await task.cancel()
# Return the buffers so they can be passed to other handlers
return in_memory_audio_buffer, in_memory_transcript_buffer, in_memory_logs_buffer
return in_memory_audio_buffer, in_memory_transcript_buffer
def register_task_event_handler(

View file

@ -1,11 +1,16 @@
"""Real-time feedback observer for sending pipeline events to the frontend.
This observer watches pipeline frames and sends relevant events (transcriptions,
bot text) over WebSocket to provide real-time feedback in the UI.
bot text, function calls, TTFB metrics) over WebSocket to provide real-time
feedback in the UI.
For frames with presentation timestamps (pts), like TTSTextFrame, we respect
the timing by queuing them and sending at the appropriate time, similar to
how base_output.py handles timed frames.
Note: Node transition events are sent directly from PipecatEngine.set_node()
rather than being observed here, to ensure precise timing at the moment of
node changes.
"""
import asyncio
@ -24,20 +29,30 @@ from pipecat.frames.frames import (
FunctionCallResultFrame,
InterimTranscriptionFrame,
InterruptionFrame,
MetricsFrame,
StopFrame,
TranscriptionFrame,
TTSTextFrame,
)
from pipecat.metrics.metrics import TTFBMetricsData
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.processors.frame_processor import FrameDirection
from pipecat.utils.time import nanoseconds_to_seconds
class RealtimeFeedbackObserver(BaseObserver):
"""Observer that sends real-time transcription and bot response events via WebSocket.
"""Observer that sends real-time transcription, bot response, and metrics via WebSocket.
Observes pipeline frames and sends events for:
- User transcriptions (interim and final)
- Bot TTS text (with pts-based timing)
- Function calls (start/end)
- TTFB metrics (LLM generation time only - filters to processors containing "LLM")
For frames with pts (presentation timestamp), we queue them and send at the
appropriate time to sync with audio playback.
Note: Node transitions are handled by PipecatEngine.set_node() callback.
"""
def __init__(
@ -132,6 +147,8 @@ class RealtimeFeedbackObserver(BaseObserver):
frame = data.frame
frame_direction = data.direction
logger.trace(f"{self} Received Frame: {frame} Direction: {frame_direction}")
# Handle pipeline termination - stop clock task
if isinstance(frame, (EndFrame, CancelFrame, StopFrame)):
await self._cancel_clock_task()
@ -226,6 +243,23 @@ class RealtimeFeedbackObserver(BaseObserver):
},
}
)
# Handle TTFB metrics - capture LLM generation time only
elif isinstance(frame, MetricsFrame):
# Check if this MetricsFrame contains TTFB data from an LLM processor
for metric_data in frame.data:
if isinstance(metric_data, TTFBMetricsData):
# Only send TTFB if it's from an LLM processor
if metric_data.processor and "LLM" in metric_data.processor:
await self._send_message(
{
"type": "rtf-ttfb-metric",
"payload": {
"ttfb_seconds": metric_data.value,
"processor": metric_data.processor,
"model": metric_data.model,
},
}
)
async def _send_message(self, message: dict):
"""Send message via WebSocket AND append to logs buffer, handling errors gracefully."""

View file

@ -14,6 +14,7 @@ from api.services.pipecat.event_handlers import (
register_transcript_handler,
register_transport_event_handlers,
)
from api.services.pipecat.in_memory_buffers import InMemoryLogsBuffer
from api.services.pipecat.pipeline_builder import (
build_pipeline,
create_pipeline_components,
@ -467,11 +468,45 @@ async def _run_pipeline(
ReactFlowDTO.model_validate(workflow.workflow_definition_with_fallback)
)
# Create in-memory logs buffer early so it can be used by engine callbacks
in_memory_logs_buffer = InMemoryLogsBuffer(workflow_run_id)
# Create node transition callback if WebSocket sender is available
node_transition_callback = None
ws_sender = get_ws_sender(workflow_run_id)
if ws_sender:
async def send_node_transition(
node_name: str, previous_node: Optional[str]
) -> None:
"""Send node transition event via WebSocket AND log to buffer."""
message = {
"type": "rtf-node-transition",
"payload": {
"node_name": node_name,
"previous_node": previous_node,
},
}
# Send via WebSocket
try:
await ws_sender(message)
except Exception as e:
logger.debug(f"Failed to send node transition via WebSocket: {e}")
# Log to in-memory buffer
try:
await in_memory_logs_buffer.append(message)
except Exception as e:
logger.error(f"Failed to append node transition to logs buffer: {e}")
node_transition_callback = send_node_transition
engine = PipecatEngine(
llm=llm,
workflow=workflow_graph,
call_context_vars=merged_call_context_vars,
workflow_run_id=workflow_run_id,
node_transition_callback=node_transition_callback,
)
# Create pipeline components with audio configuration and engine
@ -573,7 +608,7 @@ async def _run_pipeline(
await engine.initialize()
# Register event handlers
in_memory_audio_buffer, in_memory_transcript_buffer, in_memory_logs_buffer = (
in_memory_audio_buffer, in_memory_transcript_buffer = (
register_transport_event_handlers(
task,
transport,
@ -585,10 +620,11 @@ async def _run_pipeline(
)
# Add real-time feedback observer if WebSocket sender is available
ws_sender = get_ws_sender(workflow_run_id)
# Note: ws_sender was already fetched earlier for node_transition_callback
if ws_sender:
feedback_observer = RealtimeFeedbackObserver(
ws_sender=ws_sender, logs_buffer=in_memory_logs_buffer
ws_sender=ws_sender,
logs_buffer=in_memory_logs_buffer,
)
task.add_observer(feedback_observer)

View file

@ -62,6 +62,9 @@ class PipecatEngine:
call_context_vars: dict,
audio_buffer: Optional["AudioBuffer"] = None,
workflow_run_id: Optional[int] = None,
node_transition_callback: Optional[
Callable[[str, Optional[str]], Awaitable[None]]
] = None,
):
self.task = task
self.llm = llm
@ -71,6 +74,7 @@ class PipecatEngine:
self._call_context_vars = call_context_vars
self._audio_buffer = audio_buffer
self._workflow_run_id = workflow_run_id
self._node_transition_callback = node_transition_callback
self._initialized = False
self._client_disconnected = False
self._call_disposed = False
@ -359,9 +363,20 @@ class PipecatEngine:
f"Executing node: name: {node.name} is_static: {node.is_static} allow_interrupt: {node.allow_interrupt} is_end: {node.is_end}"
)
# Track previous node for transition event
previous_node_name = self._current_node.name if self._current_node else None
# Set current node for all nodes (including static ones) so STT mute filter works
self._current_node = node
# Send node transition event if callback is provided
if self._node_transition_callback:
try:
await self._node_transition_callback(node.name, previous_node_name)
except Exception as e:
# Log but don't fail - feedback is non-critical
logger.debug(f"Failed to send node transition event: {e}")
# Handle start nodes
if node.is_start:
await self._handle_start_node(node)
@ -693,5 +708,3 @@ class PipecatEngine:
and not self._user_response_timeout_task.done()
):
self._user_response_timeout_task.cancel()
# Note: Native VoicemailDetector cleanup is handled by the pipeline