mirror of
https://github.com/dograh-hq/dograh.git
synced 2026-06-07 07:55:16 +02:00
* feat: add backend foundations * feat: add text chat UI * chore: simplify the reload behaviour * fix: fix upgrade banner to be triggered after package upload * feat: simplify TesterPanel design * chore: fix formatting and generate client * chore: fix tracing for text chat mode * fix: fix revert and edit CTA * refactor: refactor TesterPanel into smaller components * feat: enable runtime transition of nodes * fix: fix review comments
163 lines
4.1 KiB
Python
163 lines
4.1 KiB
Python
"""Shared helpers for building and ordering realtime feedback events."""
|
|
|
|
from typing import Any
|
|
|
|
from pipecat.utils.enums import RealtimeFeedbackType
|
|
|
|
|
|
def build_node_transition_event(
|
|
*,
|
|
node_id: str | None,
|
|
node_name: str | None,
|
|
previous_node_id: str | None,
|
|
previous_node_name: str | None,
|
|
allow_interrupt: bool = False,
|
|
) -> dict[str, Any]:
|
|
return {
|
|
"type": RealtimeFeedbackType.NODE_TRANSITION.value,
|
|
"payload": {
|
|
"node_id": node_id,
|
|
"node_name": node_name,
|
|
"previous_node_id": previous_node_id,
|
|
"previous_node_name": previous_node_name,
|
|
"allow_interrupt": allow_interrupt,
|
|
},
|
|
}
|
|
|
|
|
|
def build_user_transcription_event(
|
|
*,
|
|
text: str,
|
|
final: bool,
|
|
timestamp: str | None = None,
|
|
user_id: str | None = None,
|
|
) -> dict[str, Any]:
|
|
payload: dict[str, Any] = {
|
|
"text": text,
|
|
"final": final,
|
|
}
|
|
if timestamp is not None:
|
|
payload["timestamp"] = timestamp
|
|
if user_id is not None:
|
|
payload["user_id"] = user_id
|
|
return {
|
|
"type": RealtimeFeedbackType.USER_TRANSCRIPTION.value,
|
|
"payload": payload,
|
|
}
|
|
|
|
|
|
def build_bot_text_event(
|
|
*,
|
|
text: str,
|
|
timestamp: str | None = None,
|
|
) -> dict[str, Any]:
|
|
payload: dict[str, Any] = {"text": text}
|
|
if timestamp is not None:
|
|
payload["timestamp"] = timestamp
|
|
return {
|
|
"type": RealtimeFeedbackType.BOT_TEXT.value,
|
|
"payload": payload,
|
|
}
|
|
|
|
|
|
def build_function_call_start_event(
|
|
*,
|
|
function_name: str | None,
|
|
tool_call_id: str | None,
|
|
arguments: dict[str, Any] | None = None,
|
|
) -> dict[str, Any]:
|
|
payload: dict[str, Any] = {
|
|
"function_name": function_name,
|
|
"tool_call_id": tool_call_id,
|
|
}
|
|
if arguments is not None:
|
|
payload["arguments"] = arguments
|
|
return {
|
|
"type": RealtimeFeedbackType.FUNCTION_CALL_START.value,
|
|
"payload": payload,
|
|
}
|
|
|
|
|
|
def serialize_realtime_feedback_tool_result(result: Any) -> str | None:
|
|
"""Normalize function-call results to the string shape stored in logs."""
|
|
if result is None:
|
|
return None
|
|
return str(result)
|
|
|
|
|
|
def build_function_call_end_event(
|
|
*,
|
|
function_name: str | None,
|
|
tool_call_id: str | None,
|
|
result: Any,
|
|
) -> dict[str, Any]:
|
|
return {
|
|
"type": RealtimeFeedbackType.FUNCTION_CALL_END.value,
|
|
"payload": {
|
|
"function_name": function_name,
|
|
"tool_call_id": tool_call_id,
|
|
"result": serialize_realtime_feedback_tool_result(result),
|
|
},
|
|
}
|
|
|
|
|
|
def build_ttfb_metric_event(
|
|
*,
|
|
ttfb_seconds: float,
|
|
processor: str | None,
|
|
model: str | None,
|
|
) -> dict[str, Any]:
|
|
return {
|
|
"type": RealtimeFeedbackType.TTFB_METRIC.value,
|
|
"payload": {
|
|
"ttfb_seconds": ttfb_seconds,
|
|
"processor": processor,
|
|
"model": model,
|
|
},
|
|
}
|
|
|
|
|
|
def build_pipeline_error_event(
|
|
*,
|
|
error: str,
|
|
fatal: bool,
|
|
processor: str | None = None,
|
|
extra_payload: dict[str, Any] | None = None,
|
|
) -> dict[str, Any]:
|
|
payload: dict[str, Any] = {
|
|
"error": error,
|
|
"fatal": fatal,
|
|
}
|
|
if processor is not None:
|
|
payload["processor"] = processor
|
|
if extra_payload:
|
|
payload.update(extra_payload)
|
|
return {
|
|
"type": RealtimeFeedbackType.PIPELINE_ERROR.value,
|
|
"payload": payload,
|
|
}
|
|
|
|
|
|
def stamp_realtime_feedback_event(
|
|
event: dict[str, Any],
|
|
*,
|
|
timestamp: str | None = None,
|
|
turn: int | None = None,
|
|
node_id: str | None = None,
|
|
node_name: str | None = None,
|
|
) -> dict[str, Any]:
|
|
stamped = dict(event)
|
|
if timestamp is not None:
|
|
stamped["timestamp"] = timestamp
|
|
if turn is not None:
|
|
stamped["turn"] = turn
|
|
if node_id is not None:
|
|
stamped["node_id"] = node_id
|
|
if node_name is not None:
|
|
stamped["node_name"] = node_name
|
|
return stamped
|
|
|
|
|
|
def realtime_feedback_event_sort_key(event: dict[str, Any]) -> str:
|
|
payload_timestamp = (event.get("payload") or {}).get("timestamp")
|
|
return payload_timestamp or event.get("timestamp") or ""
|