dograh/api/services/workflow/text_chat_logs.py

145 lines
5.7 KiB
Python
Raw Permalink Normal View History

"""Helpers for projecting text-chat session state into run-log snapshots."""
from typing import Any
from api.services.pipecat.realtime_feedback_events import (
build_bot_text_event,
build_function_call_end_event,
build_function_call_start_event,
build_node_transition_event,
build_pipeline_error_event,
build_user_transcription_event,
realtime_feedback_event_sort_key,
stamp_realtime_feedback_event,
)
def visible_text_chat_turns(session_data: dict[str, Any]) -> list[dict[str, Any]]:
"""Return the active branch of turns for the current text-chat session.
After a rewind, `session_data["turns"]` may still contain future turns until
the next message is sent. Those turns are no longer part of the visible
branch, so callers that synthesize transcript/log views should trim at
`cursor_turn_id`.
"""
turns = list(session_data.get("turns") or [])
cursor_turn_id = session_data.get("cursor_turn_id")
if cursor_turn_id is None:
return turns
for index, turn in enumerate(turns):
if turn.get("id") == cursor_turn_id:
return turns[: index + 1]
return turns
def build_text_chat_realtime_feedback_events(
session_data: dict[str, Any],
) -> list[dict[str, Any]]:
"""Project text-chat session state into `workflow_runs.logs` event format.
`workflow_run_text_sessions` holds the authoritative rewindable conversation
state. Historical run pages and QA helpers read the normalized
`workflow_runs.logs.realtime_feedback_events` schema instead, so this helper
rebuilds that snapshot from the currently visible branch.
"""
events: list[dict[str, Any]] = []
last_emitted_node_id: str | None = None
for turn_index, turn in enumerate(visible_text_chat_turns(session_data)):
turn_events = list(turn.get("events") or [])
for event in turn_events:
payload = dict(event.get("payload") or {})
event_type = event.get("type")
timestamp = event.get("created_at") or turn.get("created_at")
if event_type == "node_transition":
node_id = payload.get("node_id")
if node_id is not None and node_id == last_emitted_node_id:
continue
snapshot_event = stamp_realtime_feedback_event(
build_node_transition_event(
node_id=node_id,
node_name=payload.get("node_name"),
previous_node_id=payload.get("previous_node_id"),
previous_node_name=payload.get("previous_node_name"),
allow_interrupt=bool(payload.get("allow_interrupt", False)),
),
timestamp=timestamp,
turn=turn_index,
node_id=node_id,
node_name=payload.get("node_name"),
)
if node_id is not None:
last_emitted_node_id = node_id
events.append(snapshot_event)
elif event_type == "tool_call_started":
events.append(
stamp_realtime_feedback_event(
build_function_call_start_event(
function_name=payload.get("function_name"),
tool_call_id=payload.get("tool_call_id"),
arguments=payload.get("arguments"),
),
timestamp=timestamp,
turn=turn_index,
)
)
elif event_type == "tool_call_result":
events.append(
stamp_realtime_feedback_event(
build_function_call_end_event(
function_name=payload.get("function_name"),
tool_call_id=payload.get("tool_call_id"),
result=payload.get("result"),
),
timestamp=timestamp,
turn=turn_index,
)
)
elif event_type == "execution_error":
events.append(
stamp_realtime_feedback_event(
build_pipeline_error_event(
error=payload.get("message", "Execution error"),
fatal=True,
),
timestamp=timestamp,
turn=turn_index,
)
)
user_message = turn.get("user_message") or {}
if user_message.get("text"):
message_timestamp = user_message.get("created_at") or turn.get("created_at")
events.append(
stamp_realtime_feedback_event(
build_user_transcription_event(
text=user_message["text"],
final=True,
timestamp=message_timestamp,
),
timestamp=message_timestamp,
turn=turn_index,
)
)
assistant_message = turn.get("assistant_message") or {}
if assistant_message.get("text"):
message_timestamp = assistant_message.get("created_at") or turn.get(
"created_at"
)
events.append(
stamp_realtime_feedback_event(
build_bot_text_event(
text=assistant_message["text"],
timestamp=message_timestamp,
),
timestamp=message_timestamp,
turn=turn_index,
)
)
return sorted(events, key=realtime_feedback_event_sort_key)