feat: add rtf in logs (#119)

* feat: add rtf in logs

* chore: unify the call logs and real time events
This commit is contained in:
Abhishek 2026-01-15 16:17:17 +05:30 committed by GitHub
parent a172db8022
commit cac25879bf
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
19 changed files with 861 additions and 206 deletions

View file

@ -611,6 +611,7 @@ async def get_workflow_run(
"initial_context": run.initial_context,
"gathered_context": run.gathered_context,
"call_type": run.call_type,
"logs": run.logs,
}

View file

@ -20,3 +20,4 @@ class WorkflowRunResponseSchema(BaseModel):
initial_context: dict | None = None
gathered_context: dict | None = None
call_type: CallType
logs: Dict[str, Any] | None = None

View file

@ -4,8 +4,9 @@ from api.db import db_client
from api.enums import WorkflowRunState
from api.services.campaign.call_dispatcher import campaign_call_dispatcher
from api.services.pipecat.audio_config import AudioConfig
from api.services.pipecat.audio_transcript_buffers import (
from api.services.pipecat.in_memory_buffers import (
InMemoryAudioBuffer,
InMemoryLogsBuffer,
InMemoryTranscriptBuffer,
)
from api.services.pipecat.pipeline_metrics_aggregator import PipelineMetricsAggregator
@ -80,6 +81,7 @@ def register_task_event_handler(
audio_buffer: AudioBufferProcessor,
in_memory_audio_buffer: InMemoryAudioBuffer,
in_memory_transcript_buffer: InMemoryTranscriptBuffer,
in_memory_logs_buffer: InMemoryLogsBuffer,
pipeline_metrics_aggregator: PipelineMetricsAggregator,
):
@task.event_handler("on_pipeline_started")
@ -185,6 +187,22 @@ def register_task_event_handler(
state=WorkflowRunState.COMPLETED.value,
)
# Save real-time feedback logs to workflow run
if not in_memory_logs_buffer.is_empty:
try:
feedback_events = in_memory_logs_buffer.get_events()
await db_client.update_workflow_run(
run_id=workflow_run_id,
logs={"realtime_feedback_events": feedback_events},
)
logger.debug(
f"Saved {len(feedback_events)} feedback events to workflow run logs"
)
except Exception as e:
logger.error(f"Error saving realtime feedback logs: {e}", exc_info=True)
else:
logger.debug("Logs buffer is empty, skipping save")
# Release concurrent slot for campaign calls
if workflow_run and workflow_run.campaign_id:
await campaign_call_dispatcher.release_call_slot(workflow_run_id)

View file

@ -2,6 +2,7 @@ import asyncio
import re
import tempfile
import wave
from datetime import UTC, datetime
from typing import List
from loguru import logger
@ -120,3 +121,41 @@ class InMemoryTranscriptBuffer:
if self._USER_SPEECH_RE.match(line):
return True
return False
class InMemoryLogsBuffer:
"""Buffer real-time feedback events in memory during a call, then save to workflow run logs."""
def __init__(self, workflow_run_id: int):
self._workflow_run_id = workflow_run_id
self._events: List[dict] = []
self._turn_counter = 0
async def append(self, event: dict):
"""Append a feedback event to the buffer with timestamp."""
# Add timestamp and turn tracking
timestamped_event = {
**event,
"timestamp": datetime.now(UTC).isoformat(),
"turn": self._turn_counter,
}
self._events.append(timestamped_event)
logger.trace(
f"Appended event {event.get('type')} to logs buffer for workflow {self._workflow_run_id}"
)
def increment_turn(self):
"""Increment turn counter (called on user transcription completion)."""
self._turn_counter += 1
logger.trace(
f"Incremented turn counter to {self._turn_counter} for workflow {self._workflow_run_id}"
)
def get_events(self) -> List[dict]:
"""Get all events for final storage."""
return self._events
@property
def is_empty(self) -> bool:
"""Check if the buffer is empty."""
return len(self._events) == 0

View file

@ -1,19 +1,27 @@
"""Real-time feedback observer for sending pipeline events to the frontend.
This observer watches pipeline frames and sends relevant events (transcriptions,
bot text) over WebSocket to provide real-time feedback in the UI.
bot text, function calls, TTFB metrics) over WebSocket to provide real-time
feedback in the UI.
For frames with presentation timestamps (pts), like TTSTextFrame, we respect
the timing by queuing them and sending at the appropriate time, similar to
how base_output.py handles timed frames.
Note: Node transition events are sent directly from PipecatEngine.set_node()
rather than being observed here, to ensure precise timing at the moment of
node changes.
"""
import asyncio
import time
from typing import Awaitable, Callable, Optional, Set
from typing import TYPE_CHECKING, Awaitable, Callable, Optional, Set
from loguru import logger
if TYPE_CHECKING:
from api.services.pipecat.in_memory_buffers import InMemoryLogsBuffer
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
@ -21,33 +29,46 @@ from pipecat.frames.frames import (
FunctionCallResultFrame,
InterimTranscriptionFrame,
InterruptionFrame,
MetricsFrame,
StopFrame,
TranscriptionFrame,
TTSTextFrame,
)
from pipecat.metrics.metrics import TTFBMetricsData
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.processors.frame_processor import FrameDirection
from pipecat.utils.time import nanoseconds_to_seconds
class RealtimeFeedbackObserver(BaseObserver):
"""Observer that sends real-time transcription and bot response events via WebSocket.
"""Observer that sends real-time transcription, bot response, and metrics via WebSocket.
Observes pipeline frames and sends events for:
- User transcriptions (interim and final)
- Bot TTS text (with pts-based timing)
- Function calls (start/end)
- TTFB metrics (LLM generation time only - filters to processors containing "LLM")
For frames with pts (presentation timestamp), we queue them and send at the
appropriate time to sync with audio playback.
Note: Node transitions are handled by PipecatEngine.set_node() callback.
"""
def __init__(
self,
ws_sender: Callable[[dict], Awaitable[None]],
logs_buffer: Optional["InMemoryLogsBuffer"] = None,
):
"""
Args:
ws_sender: Async function to send messages over WebSocket.
Expected signature: async def send(message: dict) -> None
logs_buffer: Optional InMemoryLogsBuffer to persist events for post-call analysis.
"""
super().__init__()
self._ws_sender = ws_sender
self._logs_buffer = logs_buffer
self._frames_seen: Set[str] = set()
# Clock/timing for pts-based frames (similar to base_output.py)
@ -126,6 +147,8 @@ class RealtimeFeedbackObserver(BaseObserver):
frame = data.frame
frame_direction = data.direction
logger.trace(f"{self} Received Frame: {frame} Direction: {frame_direction}")
# Handle pipeline termination - stop clock task
if isinstance(frame, (EndFrame, CancelFrame, StopFrame)):
await self._cancel_clock_task()
@ -167,6 +190,9 @@ class RealtimeFeedbackObserver(BaseObserver):
},
}
)
# Increment turn counter on final user transcription
if self._logs_buffer:
self._logs_buffer.increment_turn()
# Handle bot TTS text - respect pts timing
elif isinstance(frame, TTSTextFrame):
message = {
@ -217,11 +243,36 @@ class RealtimeFeedbackObserver(BaseObserver):
},
}
)
# Handle TTFB metrics - capture LLM generation time only
elif isinstance(frame, MetricsFrame):
# Check if this MetricsFrame contains TTFB data from an LLM processor
for metric_data in frame.data:
if isinstance(metric_data, TTFBMetricsData):
# Only send TTFB if it's from an LLM processor
if metric_data.processor and "LLM" in metric_data.processor:
await self._send_message(
{
"type": "rtf-ttfb-metric",
"payload": {
"ttfb_seconds": metric_data.value,
"processor": metric_data.processor,
"model": metric_data.model,
},
}
)
async def _send_message(self, message: dict):
"""Send message via WebSocket, handling errors gracefully."""
"""Send message via WebSocket AND append to logs buffer, handling errors gracefully."""
# Send via WebSocket
try:
await self._ws_sender(message)
except Exception as e:
# Log but don't fail - feedback is non-critical
logger.debug(f"Failed to send real-time feedback message: {e}")
# Also append to logs buffer
if self._logs_buffer:
try:
await self._logs_buffer.append(message)
except Exception as e:
logger.error(f"Failed to append to logs buffer: {e}")

View file

@ -14,6 +14,7 @@ from api.services.pipecat.event_handlers import (
register_transcript_handler,
register_transport_event_handlers,
)
from api.services.pipecat.in_memory_buffers import InMemoryLogsBuffer
from api.services.pipecat.pipeline_builder import (
build_pipeline,
create_pipeline_components,
@ -467,11 +468,45 @@ async def _run_pipeline(
ReactFlowDTO.model_validate(workflow.workflow_definition_with_fallback)
)
# Create in-memory logs buffer early so it can be used by engine callbacks
in_memory_logs_buffer = InMemoryLogsBuffer(workflow_run_id)
# Create node transition callback if WebSocket sender is available
node_transition_callback = None
ws_sender = get_ws_sender(workflow_run_id)
if ws_sender:
async def send_node_transition(
node_name: str, previous_node: Optional[str]
) -> None:
"""Send node transition event via WebSocket AND log to buffer."""
message = {
"type": "rtf-node-transition",
"payload": {
"node_name": node_name,
"previous_node": previous_node,
},
}
# Send via WebSocket
try:
await ws_sender(message)
except Exception as e:
logger.debug(f"Failed to send node transition via WebSocket: {e}")
# Log to in-memory buffer
try:
await in_memory_logs_buffer.append(message)
except Exception as e:
logger.error(f"Failed to append node transition to logs buffer: {e}")
node_transition_callback = send_node_transition
engine = PipecatEngine(
llm=llm,
workflow=workflow_graph,
call_context_vars=merged_call_context_vars,
workflow_run_id=workflow_run_id,
node_transition_callback=node_transition_callback,
)
# Create pipeline components with audio configuration and engine
@ -566,12 +601,6 @@ async def _run_pipeline(
# Create pipeline task with audio configuration
task = create_pipeline_task(pipeline, workflow_run_id, audio_config)
# Add real-time feedback observer if WebSocket sender is available
ws_sender = get_ws_sender(workflow_run_id)
if ws_sender:
feedback_observer = RealtimeFeedbackObserver(ws_sender=ws_sender)
task.add_observer(feedback_observer)
# Now set the task on the engine
engine.set_task(task)
@ -590,6 +619,15 @@ async def _run_pipeline(
)
)
# Add real-time feedback observer if WebSocket sender is available
# Note: ws_sender was already fetched earlier for node_transition_callback
if ws_sender:
feedback_observer = RealtimeFeedbackObserver(
ws_sender=ws_sender,
logs_buffer=in_memory_logs_buffer,
)
task.add_observer(feedback_observer)
register_task_event_handler(
workflow_run_id,
engine,
@ -598,6 +636,7 @@ async def _run_pipeline(
audio_buffer,
in_memory_audio_buffer,
in_memory_transcript_buffer,
in_memory_logs_buffer,
pipeline_metrics_aggregator,
)

View file

@ -62,6 +62,9 @@ class PipecatEngine:
call_context_vars: dict,
audio_buffer: Optional["AudioBuffer"] = None,
workflow_run_id: Optional[int] = None,
node_transition_callback: Optional[
Callable[[str, Optional[str]], Awaitable[None]]
] = None,
):
self.task = task
self.llm = llm
@ -71,6 +74,7 @@ class PipecatEngine:
self._call_context_vars = call_context_vars
self._audio_buffer = audio_buffer
self._workflow_run_id = workflow_run_id
self._node_transition_callback = node_transition_callback
self._initialized = False
self._client_disconnected = False
self._call_disposed = False
@ -359,9 +363,20 @@ class PipecatEngine:
f"Executing node: name: {node.name} is_static: {node.is_static} allow_interrupt: {node.allow_interrupt} is_end: {node.is_end}"
)
# Track previous node for transition event
previous_node_name = self._current_node.name if self._current_node else None
# Set current node for all nodes (including static ones) so STT mute filter works
self._current_node = node
# Send node transition event if callback is provided
if self._node_transition_callback:
try:
await self._node_transition_callback(node.name, previous_node_name)
except Exception as e:
# Log but don't fail - feedback is non-critical
logger.debug(f"Failed to send node transition event: {e}")
# Handle start nodes
if node.is_start:
await self._handle_start_node(node)
@ -693,5 +708,3 @@ class PipecatEngine:
and not self._user_response_timeout_task.done()
):
self._user_response_timeout_task.cancel()
# Note: Native VoicemailDetector cleanup is handled by the pipeline