feat: add rtf log when user speaks when muted

This commit is contained in:
Abhishek Kumar 2026-03-21 13:55:34 +05:30
parent 93c45580e7
commit 1967a71935
13 changed files with 196 additions and 31 deletions

View file

@ -30,6 +30,8 @@ if TYPE_CHECKING:
from api.services.pipecat.in_memory_buffers import InMemoryLogsBuffer
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
CancelFrame,
EndFrame,
ErrorFrame,
@ -37,11 +39,12 @@ from pipecat.frames.frames import (
FunctionCallResultFrame,
InterimTranscriptionFrame,
InterruptionFrame,
LLMTextFrame,
MetricsFrame,
StopFrame,
TranscriptionFrame,
TTSSpeakFrame,
TTSTextFrame,
UserMuteStartedFrame,
UserMuteStoppedFrame,
)
from pipecat.metrics.metrics import TTFBMetricsData
from pipecat.observers.base_observer import BaseObserver, FramePushed
@ -174,6 +177,30 @@ class RealtimeFeedbackObserver(BaseObserver):
await self._handle_interruption()
return
# Bot speaking state - WS only (ephemeral state signals, not persisted)
if isinstance(frame, BotStartedSpeakingFrame):
await self._send_ws(
{"type": RealtimeFeedbackType.BOT_STARTED_SPEAKING.value, "payload": {}}
)
return
if isinstance(frame, BotStoppedSpeakingFrame):
await self._send_ws(
{"type": RealtimeFeedbackType.BOT_STOPPED_SPEAKING.value, "payload": {}}
)
return
# User mute state - WS only (ephemeral state signals, not persisted)
if isinstance(frame, UserMuteStartedFrame):
await self._send_ws(
{"type": RealtimeFeedbackType.USER_MUTE_STARTED.value, "payload": {}}
)
return
if isinstance(frame, UserMuteStoppedFrame):
await self._send_ws(
{"type": RealtimeFeedbackType.USER_MUTE_STOPPED.value, "payload": {}}
)
return
# Skip already processed frames (frames can be observed multiple times)
if frame.id in self._frames_seen:
return
@ -206,20 +233,9 @@ class RealtimeFeedbackObserver(BaseObserver):
},
}
)
# Handle TTSSpeakFrame (e.g. greeting) - send immediately via WS only
# Final turn text is persisted via on_assistant_turn_stopped to avoid duplication
elif isinstance(frame, TTSSpeakFrame):
await self._send_ws(
{
"type": RealtimeFeedbackType.BOT_TEXT.value,
"payload": {
"text": frame.text,
},
}
)
# Handle bot TTS text - respect pts timing, WebSocket only
# Complete turn text is persisted via register_turn_handlers
elif isinstance(frame, LLMTextFrame):
elif isinstance(frame, TTSTextFrame):
message = {
"type": RealtimeFeedbackType.BOT_TEXT.value,
"payload": {

View file

@ -29,6 +29,7 @@ from pipecat.frames.frames import (
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
TTSTextFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
@ -98,11 +99,9 @@ class RecordingRouterProcessor(FrameProcessor):
await self.push_frame(frame, direction)
return
# --- Recording mode: buffer recording_id, suppress TTS ---
# --- Recording mode: accumulate recording_id silently ---
if self._mode == "recording":
self._recording_id_buffer += frame.text
frame.skip_tts = True
await self.push_frame(frame, direction)
return
# --- Detection mode: buffer until marker found ---
@ -114,13 +113,11 @@ class RecordingRouterProcessor(FrameProcessor):
self._mode = "recording"
marker_end = buffered_text.index(RECORDING_MARKER) + len(RECORDING_MARKER)
# Push buffered frames with skip_tts, extract recording_id from post-marker text
# Extract recording_id from post-marker text (don't push frames)
cumulative = 0
for buf_frame, buf_dir in self._frame_buffer:
buf_frame.skip_tts = True
frame_start = cumulative
cumulative += len(buf_frame.text)
await self.push_frame(buf_frame, buf_dir)
# Capture any recording_id text after the marker
if cumulative > marker_end:
@ -183,6 +180,13 @@ class RecordingRouterProcessor(FrameProcessor):
if self._mode == "recording":
recording_id = self._recording_id_buffer.strip()
if recording_id:
# Push accumulated text as TTSTextFrame for UI feedback via observer
await self.push_frame(
TTSTextFrame(
text=RECORDING_MARKER + self._recording_id_buffer,
aggregated_by="recording_router",
)
)
await self._play_recording(recording_id)
else:
logger.warning(

View file

@ -526,6 +526,7 @@ async def _run_pipeline(
node_name: str,
previous_node_id: Optional[str],
previous_node_name: Optional[str],
allow_interrupt: bool = False,
) -> None:
"""Send node transition event to logs buffer and optionally via WebSocket."""
# Update current node on the buffer so subsequent events are tagged
@ -538,6 +539,7 @@ async def _run_pipeline(
"node_name": node_name,
"previous_node_id": previous_node_id,
"previous_node_name": previous_node_name,
"allow_interrupt": allow_interrupt,
},
}
# Send via WebSocket if available

View file

@ -67,7 +67,7 @@ class PipecatEngine:
call_context_vars: dict,
workflow_run_id: Optional[int] = None,
node_transition_callback: Optional[
Callable[[str, str, Optional[str], Optional[str]], Awaitable[None]]
Callable[[str, str, Optional[str], Optional[str], bool], Awaitable[None]]
] = None,
embeddings_api_key: Optional[str] = None,
embeddings_model: Optional[str] = None,
@ -521,7 +521,11 @@ class PipecatEngine:
if self._node_transition_callback:
try:
await self._node_transition_callback(
node_id, node.name, previous_node_id, previous_node_name
node_id,
node.name,
previous_node_id,
previous_node_name,
node.allow_interrupt,
)
except Exception as e:
# Log but don't fail - feedback is non-critical