mirror of
https://github.com/dograh-hq/dograh.git
synced 2026-06-07 07:55:16 +02:00
144 lines
5.7 KiB
Python
144 lines
5.7 KiB
Python
"""Helpers for projecting text-chat session state into run-log snapshots."""
|
|
|
|
from typing import Any
|
|
|
|
from api.services.pipecat.realtime_feedback_events import (
|
|
build_bot_text_event,
|
|
build_function_call_end_event,
|
|
build_function_call_start_event,
|
|
build_node_transition_event,
|
|
build_pipeline_error_event,
|
|
build_user_transcription_event,
|
|
realtime_feedback_event_sort_key,
|
|
stamp_realtime_feedback_event,
|
|
)
|
|
|
|
|
|
def visible_text_chat_turns(session_data: dict[str, Any]) -> list[dict[str, Any]]:
|
|
"""Return the active branch of turns for the current text-chat session.
|
|
|
|
After a rewind, `session_data["turns"]` may still contain future turns until
|
|
the next message is sent. Those turns are no longer part of the visible
|
|
branch, so callers that synthesize transcript/log views should trim at
|
|
`cursor_turn_id`.
|
|
"""
|
|
turns = list(session_data.get("turns") or [])
|
|
cursor_turn_id = session_data.get("cursor_turn_id")
|
|
if cursor_turn_id is None:
|
|
return turns
|
|
|
|
for index, turn in enumerate(turns):
|
|
if turn.get("id") == cursor_turn_id:
|
|
return turns[: index + 1]
|
|
|
|
return turns
|
|
|
|
|
|
def build_text_chat_realtime_feedback_events(
|
|
session_data: dict[str, Any],
|
|
) -> list[dict[str, Any]]:
|
|
"""Project text-chat session state into `workflow_runs.logs` event format.
|
|
|
|
`workflow_run_text_sessions` holds the authoritative rewindable conversation
|
|
state. Historical run pages and QA helpers read the normalized
|
|
`workflow_runs.logs.realtime_feedback_events` schema instead, so this helper
|
|
rebuilds that snapshot from the currently visible branch.
|
|
"""
|
|
events: list[dict[str, Any]] = []
|
|
last_emitted_node_id: str | None = None
|
|
|
|
for turn_index, turn in enumerate(visible_text_chat_turns(session_data)):
|
|
turn_events = list(turn.get("events") or [])
|
|
for event in turn_events:
|
|
payload = dict(event.get("payload") or {})
|
|
event_type = event.get("type")
|
|
timestamp = event.get("created_at") or turn.get("created_at")
|
|
|
|
if event_type == "node_transition":
|
|
node_id = payload.get("node_id")
|
|
if node_id is not None and node_id == last_emitted_node_id:
|
|
continue
|
|
snapshot_event = stamp_realtime_feedback_event(
|
|
build_node_transition_event(
|
|
node_id=node_id,
|
|
node_name=payload.get("node_name"),
|
|
previous_node_id=payload.get("previous_node_id"),
|
|
previous_node_name=payload.get("previous_node_name"),
|
|
allow_interrupt=bool(payload.get("allow_interrupt", False)),
|
|
),
|
|
timestamp=timestamp,
|
|
turn=turn_index,
|
|
node_id=node_id,
|
|
node_name=payload.get("node_name"),
|
|
)
|
|
if node_id is not None:
|
|
last_emitted_node_id = node_id
|
|
events.append(snapshot_event)
|
|
elif event_type == "tool_call_started":
|
|
events.append(
|
|
stamp_realtime_feedback_event(
|
|
build_function_call_start_event(
|
|
function_name=payload.get("function_name"),
|
|
tool_call_id=payload.get("tool_call_id"),
|
|
arguments=payload.get("arguments"),
|
|
),
|
|
timestamp=timestamp,
|
|
turn=turn_index,
|
|
)
|
|
)
|
|
elif event_type == "tool_call_result":
|
|
events.append(
|
|
stamp_realtime_feedback_event(
|
|
build_function_call_end_event(
|
|
function_name=payload.get("function_name"),
|
|
tool_call_id=payload.get("tool_call_id"),
|
|
result=payload.get("result"),
|
|
),
|
|
timestamp=timestamp,
|
|
turn=turn_index,
|
|
)
|
|
)
|
|
elif event_type == "execution_error":
|
|
events.append(
|
|
stamp_realtime_feedback_event(
|
|
build_pipeline_error_event(
|
|
error=payload.get("message", "Execution error"),
|
|
fatal=True,
|
|
),
|
|
timestamp=timestamp,
|
|
turn=turn_index,
|
|
)
|
|
)
|
|
|
|
user_message = turn.get("user_message") or {}
|
|
if user_message.get("text"):
|
|
message_timestamp = user_message.get("created_at") or turn.get("created_at")
|
|
events.append(
|
|
stamp_realtime_feedback_event(
|
|
build_user_transcription_event(
|
|
text=user_message["text"],
|
|
final=True,
|
|
timestamp=message_timestamp,
|
|
),
|
|
timestamp=message_timestamp,
|
|
turn=turn_index,
|
|
)
|
|
)
|
|
|
|
assistant_message = turn.get("assistant_message") or {}
|
|
if assistant_message.get("text"):
|
|
message_timestamp = assistant_message.get("created_at") or turn.get(
|
|
"created_at"
|
|
)
|
|
events.append(
|
|
stamp_realtime_feedback_event(
|
|
build_bot_text_event(
|
|
text=assistant_message["text"],
|
|
timestamp=message_timestamp,
|
|
),
|
|
timestamp=message_timestamp,
|
|
turn=turn_index,
|
|
)
|
|
)
|
|
|
|
return sorted(events, key=realtime_feedback_event_sort_key)
|