feat: run per node QA

This commit is contained in:
Abhishek Kumar 2026-02-25 17:17:48 +05:30
parent a836825b83
commit c8742dbdc0
13 changed files with 1012 additions and 373 deletions

View file

@ -374,6 +374,36 @@ class WorkflowClient(BaseDBClient):
)
return result.scalar() or 0
async def update_definition_node_summaries(
self, definition_id: int, node_summaries: dict
) -> None:
"""Update the node_summaries field within a workflow definition's workflow_json.
Args:
definition_id: The ID of the WorkflowDefinitionModel to update
node_summaries: Dict mapping node_id to summary data
(e.g. {"summary": "...", "trace_url": "..."})
"""
async with self.async_session() as session:
result = await session.execute(
select(WorkflowDefinitionModel).where(
WorkflowDefinitionModel.id == definition_id
)
)
definition = result.scalars().first()
if not definition:
return
workflow_json = dict(definition.workflow_json)
workflow_json["node_summaries"] = node_summaries
definition.workflow_json = workflow_json
try:
await session.commit()
except Exception as e:
await session.rollback()
raise e
async def get_workflow_run_counts(self, workflow_ids: list[int]) -> dict[int, int]:
"""Get run counts for multiple workflows in a single query.

View file

@ -376,10 +376,11 @@ class WorkflowRunClient(BaseDBClient):
result = await session.execute(
select(WorkflowRunModel)
.options(
selectinload(WorkflowRunModel.definition),
selectinload(WorkflowRunModel.workflow).options(
selectinload(WorkflowModel.user),
selectinload(WorkflowModel.current_definition),
)
),
)
.where(WorkflowRunModel.id == workflow_run_id)
)

View file

@ -1,360 +0,0 @@
"""QA analysis service for post-call quality assessment.
Runs LLM-based analysis on call transcripts, traces under the same
Langfuse trace as the conversation, and returns structured results.
"""
import json
import re
from datetime import datetime
from typing import Any
from loguru import logger
from openai import AsyncOpenAI
from api.db import db_client
from api.db.models import WorkflowRunModel
from api.services.gen_ai.json_parser import parse_llm_json
from pipecat.utils.enums import RealtimeFeedbackType
def build_conversation_structure(logs: list[dict]) -> list[dict]:
"""Transform raw call logs into a conversation structure for LLM QA analysis."""
if not logs:
return []
start_time = datetime.fromisoformat(logs[0]["timestamp"])
conversation = []
for event in logs:
if event["type"] == RealtimeFeedbackType.BOT_TEXT.value:
speaker = "assistant"
utterance_text = event["payload"]["text"]
event_time = datetime.fromisoformat(event["payload"]["timestamp"])
elif event["type"] == RealtimeFeedbackType.USER_TRANSCRIPTION.value and event[
"payload"
].get("final", False):
speaker = "user"
utterance_text = event["payload"]["text"]
event_time = datetime.fromisoformat(event["payload"]["timestamp"])
else:
continue
time_from_start = (event_time - start_time).total_seconds()
conversation.append(
{
"time_from_start_seconds": round(time_from_start, 2),
"speaker": speaker,
"text": utterance_text,
"node_name": event.get("node_name", ""),
"turn": event.get("turn", 0),
}
)
return conversation
def format_transcript(conversation: list[dict]) -> str:
"""Format conversation structure into a readable transcript string for the LLM."""
lines = []
for entry in conversation:
lines.append(
f"[{entry['time_from_start_seconds']:.1f}s] "
f"{entry['speaker']}: {entry['text']}"
)
return "\n".join(lines)
def compute_call_metrics(
logs: list[dict], call_duration_seconds: float | None = None
) -> dict:
"""Pre-compute quantitative metrics from raw call logs."""
latencies = []
ttfb_values = []
for event in logs:
if event["type"] == RealtimeFeedbackType.LATENCY_MEASURED.value:
latencies.append(event["payload"]["latency_seconds"])
elif event["type"] == RealtimeFeedbackType.TTFB_METRIC.value:
ttfb_values.append(event["payload"]["ttfb_seconds"])
turns = set()
for event in logs:
if event["type"] in (
RealtimeFeedbackType.USER_TRANSCRIPTION.value,
RealtimeFeedbackType.BOT_TEXT.value,
):
turns.add(event.get("turn", 0))
return {
"call_duration_seconds": call_duration_seconds,
"num_turns": len(turns),
"avg_latency_seconds": (
round(sum(latencies) / len(latencies), 2) if latencies else None
),
"avg_ttfb_seconds": (
round(sum(ttfb_values) / len(ttfb_values), 2) if ttfb_values else None
),
"max_latency_seconds": round(max(latencies), 2) if latencies else None,
}
def _extract_trace_id(gathered_context: dict) -> str | None:
"""Extract Langfuse trace_id from gathered_context trace_url.
URL format: https://langfuse.dograh.com/project/<project_id>/traces/<trace_id>
"""
trace_url = gathered_context.get("trace_url")
if not trace_url:
return None
try:
match = re.search(r"/traces/([a-fA-F0-9]+)$", trace_url)
if match:
return match.group(1)
except Exception:
pass
return None
def _provider_base_url(provider: str | None, endpoint: str = "") -> str | None:
"""Return the base URL for a given LLM provider."""
if provider == "openrouter":
return "https://openrouter.ai/api/v1"
if provider == "groq":
return "https://api.groq.com/openai/v1"
if provider == "google":
return "https://generativelanguage.googleapis.com/v1beta/openai/"
if provider == "azure":
return endpoint or None
return None
async def _resolve_llm_config(
qa_node_data: dict, workflow_run: WorkflowRunModel
) -> tuple[str, str, str | None]:
"""Resolve the LLM model, API key, and base URL for QA analysis.
If the QA node has its own LLM configuration (qa_use_workflow_llm=False),
use those settings directly. Otherwise, fall back to the user's configured LLM.
Returns:
(model, api_key, base_url) tuple
"""
if not qa_node_data.get("qa_use_workflow_llm", True):
return (
qa_node_data.get("qa_model"),
qa_node_data.get("qa_api_key"),
_provider_base_url(
qa_node_data.get("qa_provider"),
qa_node_data.get("qa_endpoint", ""),
),
)
# Fall back to user's configured LLM
user_id = None
if workflow_run.workflow and workflow_run.workflow.user:
user_id = workflow_run.workflow.user.id
llm_config: dict = {}
if user_id:
user_configuration = await db_client.get_user_configurations(user_id)
llm_config = user_configuration.model_dump(exclude_none=True).get("llm", {})
provider = llm_config.get("provider", "openai")
api_key = llm_config.get("api_key", "")
qa_model = qa_node_data.get("qa_model", "default")
if qa_model and qa_model != "default":
model = qa_model
else:
model = llm_config.get("model", "gpt-4.1")
base_url = _provider_base_url(provider, llm_config.get("endpoint", ""))
# For openrouter, prefer user-configured base_url if set
if provider == "openrouter" and llm_config.get("base_url"):
base_url = llm_config["base_url"]
return model, api_key, base_url
async def run_qa_analysis(
qa_node_data: dict[str, Any],
workflow_run: WorkflowRunModel,
workflow_run_id: int,
) -> dict[str, Any]:
"""Run QA analysis on a completed workflow run.
Args:
qa_node_data: The QA node's data dict from workflow definition
workflow_run: The workflow run model with logs and context
workflow_run_id: The workflow run ID
Returns:
Dict with tags, summary, score, raw_response
"""
# Extract transcript from logs
logs = workflow_run.logs or {}
rtf_events = logs.get("realtime_feedback_events", [])
if not rtf_events:
logger.warning(f"No realtime_feedback_events for run {workflow_run_id}")
return {"error": "no_transcript", "tags": [], "summary": "", "score": None}
conversation = build_conversation_structure(rtf_events)
transcript = format_transcript(conversation)
if not transcript:
logger.warning(f"Empty transcript for run {workflow_run_id}")
return {"error": "empty_transcript", "tags": [], "summary": "", "score": None}
# Compute call metrics
usage_info = workflow_run.usage_info or {}
call_duration = usage_info.get("call_duration_seconds")
metrics = compute_call_metrics(rtf_events, call_duration)
# Resolve LLM config
system_prompt = qa_node_data.get("qa_system_prompt", "")
if not system_prompt:
logger.warning("No system prompt defined for QA Node")
return {"error": "no_system_prompt", "tags": [], "summary": "", "score": None}
model, api_key, base_url = await _resolve_llm_config(qa_node_data, workflow_run)
if not api_key:
logger.warning(
f"No LLM API key configured for QA analysis on run {workflow_run_id}"
)
return {"error": "no_api_key", "tags": [], "summary": "", "score": None}
# Build messages
system_content = system_prompt.replace("{metrics}", json.dumps(metrics, indent=2))
messages = [
{"role": "system", "content": system_content},
{"role": "user", "content": f"## Transcript\n{transcript}"},
]
# Call LLM
client_kwargs: dict[str, Any] = {"api_key": api_key}
if base_url:
client_kwargs["base_url"] = base_url
client = AsyncOpenAI(**client_kwargs)
try:
response = await client.chat.completions.create(
model=model,
messages=messages,
temperature=0,
)
raw_response = response.choices[0].message.content
except Exception as e:
logger.error(f"QA LLM call failed for run {workflow_run_id}: {e}")
return {"error": str(e), "tags": [], "summary": "", "score": None}
# Extract token usage from LLM response
token_usage = None
if response.usage:
token_usage = {
"prompt_tokens": response.usage.prompt_tokens or 0,
"completion_tokens": response.usage.completion_tokens or 0,
"total_tokens": response.usage.total_tokens or 0,
"cache_read_input_tokens": getattr(
response.usage, "cache_read_input_tokens", 0
)
or 0,
"cache_creation_input_tokens": getattr(
response.usage, "cache_creation_input_tokens", None
),
}
# Parse response
result: dict[str, Any] = {"raw_response": raw_response, "model": model}
if token_usage:
result["token_usage"] = token_usage
try:
parsed = parse_llm_json(raw_response)
result["tags"] = parsed.get("tags", [])
result["summary"] = parsed.get("summary", "")
result["score"] = parsed.get("call_quality_score")
result["overall_sentiment"] = parsed.get("overall_sentiment")
except (json.JSONDecodeError, ValueError):
result["tags"] = []
result["summary"] = ""
result["score"] = None
# Langfuse tracing — attach QA generation to the conversation trace
_add_qa_span_to_conversation_trace(
workflow_run, model, messages, raw_response, result
)
return result
def _add_qa_span_to_conversation_trace(
workflow_run: WorkflowRunModel,
model: str,
messages: list[dict],
raw_response: str,
result: dict,
):
"""Attach the QA generation to the existing Langfuse conversation trace.
Uses OpenTelemetry directly to create a child span under the existing trace,
matching the same attribute format used by the pipecat pipeline (gen_ai.*).
"""
try:
from opentelemetry import trace as otel_trace
from opentelemetry.trace import (
NonRecordingSpan,
SpanContext,
TraceFlags,
set_span_in_context,
)
from api.services.pipecat.tracing_config import (
is_tracing_enabled,
setup_tracing_exporter,
)
from pipecat.utils.tracing.service_attributes import add_llm_span_attributes
if not is_tracing_enabled():
return
# Ensure the OTEL exporter is initialized (idempotent — no-op if
# already called in the pipeline process, required in the ARQ worker).
setup_tracing_exporter()
gathered_context = workflow_run.gathered_context or {}
trace_id = _extract_trace_id(gathered_context)
if not trace_id:
logger.debug("No trace_id found, skipping Langfuse QA trace")
return
tracer = otel_trace.get_tracer("pipecat")
# Create a remote parent context from the existing trace ID
parent_span_ctx = SpanContext(
trace_id=int(trace_id, 16),
span_id=0x1, # dummy parent span id
is_remote=True,
trace_flags=TraceFlags(0x01),
)
parent_ctx = set_span_in_context(NonRecordingSpan(parent_span_ctx))
# Create a child span under the existing trace
with tracer.start_as_current_span(
"qa-analysis",
context=parent_ctx,
) as span:
add_llm_span_attributes(
span,
service_name="OpenAILLMService",
model=model,
operation_name="qa-analysis",
messages=messages,
output=raw_response,
stream=False,
parameters={"temperature": 0},
)
except Exception as e:
logger.warning(f"Failed to trace QA to Langfuse: {e}")

View file

@ -0,0 +1,12 @@
"""QA analysis service for post-call quality assessment.
Runs LLM-based analysis on call transcripts, traces under the same
Langfuse trace as the conversation, and returns structured results.
Supports per-node QA analysis where each agent/start node gets its own
evaluation with node purpose summary and prior conversation context.
"""
from api.services.workflow.qa.analysis import run_per_node_qa_analysis
__all__ = ["run_per_node_qa_analysis"]

View file

@ -0,0 +1,333 @@
"""Main QA analysis orchestrator — per-node and whole-call fallback."""
import json
from typing import Any
from loguru import logger
from openai import AsyncOpenAI
from api.db.models import WorkflowRunModel
from api.services.gen_ai.json_parser import parse_llm_json
from api.services.workflow.qa.conversation import (
build_conversation_structure,
format_transcript,
split_events_by_node,
)
from api.services.workflow.qa.llm_config import (
accumulate_token_usage,
resolve_llm_config,
)
from api.services.workflow.qa.metrics import compute_call_metrics
from api.services.workflow.qa.node_summary import (
CONVERSATION_SUMMARY_SYSTEM_PROMPT,
ensure_node_summaries,
get_node_summary_text,
)
from api.services.workflow.qa.tracing import (
add_qa_span_to_trace,
setup_langfuse_parent_context,
)
from api.utils.template_renderer import render_template
async def _generate_conversation_summary(
client: AsyncOpenAI,
model: str,
transcript: str,
parent_ctx,
node_name: str,
total_token_usage: dict,
) -> str:
"""Generate a summary of the conversation so far (before the current node).
Traced to Langfuse as conversation-summary-before-{node_name}.
"""
messages = [
{"role": "system", "content": CONVERSATION_SUMMARY_SYSTEM_PROMPT},
{"role": "user", "content": f"## Conversation\n{transcript}"},
]
try:
response = await client.chat.completions.create(
model=model,
messages=messages,
temperature=0,
)
summary = response.choices[0].message.content or ""
accumulate_token_usage(total_token_usage, response)
span_name = f"conversation-summary-before-{node_name}"
add_qa_span_to_trace(parent_ctx, model, messages, summary, span_name)
return summary
except Exception as e:
logger.warning(
f"Failed to generate conversation summary before {node_name}: {e}"
)
return ""
async def run_per_node_qa_analysis(
qa_node_data: dict[str, Any],
workflow_run: WorkflowRunModel,
workflow_run_id: int,
workflow_definition: dict,
definition_id: int | None,
) -> dict[str, Any]:
"""Run per-node QA analysis on a completed workflow run.
Splits the call by node, generates per-node summaries and conversation
context, then evaluates each node segment individually.
Falls back to whole-call QA if events lack node_id.
Returns:
Dict with node_results, token_usage, model
"""
logs = workflow_run.logs or {}
rtf_events = logs.get("realtime_feedback_events", [])
if not rtf_events:
logger.warning(f"No realtime_feedback_events for run {workflow_run_id}")
return {"error": "no_transcript", "node_results": {}}
# Try to split by node
node_splits = split_events_by_node(rtf_events)
if not node_splits:
# Fall back to whole-call QA
logger.info(
f"Events lack node_id for run {workflow_run_id}, falling back to whole-call QA"
)
return await _run_whole_call_qa_analysis(
qa_node_data, workflow_run, workflow_run_id
)
system_prompt = qa_node_data.get("qa_system_prompt", "")
if not system_prompt:
logger.warning("No system prompt defined for QA Node")
return {"error": "no_system_prompt", "node_results": {}}
# Resolve LLM config
model, api_key, base_url = await resolve_llm_config(qa_node_data, workflow_run)
if not api_key:
logger.warning(
f"No LLM API key configured for QA analysis on run {workflow_run_id}"
)
return {"error": "no_api_key", "node_results": {}}
# Ensure node summaries
node_summaries = await ensure_node_summaries(
workflow_definition, definition_id, workflow_run, qa_node_data
)
# Set up Langfuse tracing
parent_ctx = setup_langfuse_parent_context(workflow_run)
# Build LLM client
client_kwargs: dict[str, Any] = {"api_key": api_key}
if base_url:
client_kwargs["base_url"] = base_url
client = AsyncOpenAI(**client_kwargs)
total_token_usage: dict[str, int] = {}
node_results: dict[str, Any] = {}
prior_conversation: list[dict] = [] # Running accumulation of all prior nodes
for idx, (node_id, node_name, node_events) in enumerate(node_splits):
# Build this node's conversation and transcript
node_conversation = build_conversation_structure(node_events)
node_transcript = format_transcript(node_conversation)
if not node_transcript:
continue
# Compute per-node metrics
node_metrics = compute_call_metrics(node_events)
# Get node summary
node_summary = get_node_summary_text(node_summaries, node_id)
# Generate conversation summary from prior nodes (if not first)
previous_conversation_summary = ""
if idx > 0 and prior_conversation:
prior_transcript = format_transcript(prior_conversation)
previous_conversation_summary = await _generate_conversation_summary(
client,
model,
prior_transcript,
parent_ctx,
node_name,
total_token_usage,
)
# Substitute placeholders in the user's system prompt
template_context = {
"node_summary": node_summary,
"previous_conversation_summary": previous_conversation_summary,
"transcript": node_transcript,
"metrics": json.dumps(node_metrics, indent=2),
}
system_content = render_template(system_prompt, template_context)
messages = [
{"role": "system", "content": system_content},
{"role": "user", "content": f"## Transcript\n{node_transcript}"},
]
# Call QA LLM
try:
response = await client.chat.completions.create(
model=model,
messages=messages,
temperature=0,
)
raw_response = response.choices[0].message.content
accumulate_token_usage(total_token_usage, response)
except Exception as e:
logger.error(
f"QA LLM call failed for node '{node_name}' on run {workflow_run_id}: {e}"
)
node_results[node_id] = {
"node_name": node_name,
"error": str(e),
"tags": [],
"summary": "",
"score": None,
}
prior_conversation.extend(node_conversation)
continue
# Trace
span_name = f"qa-node-{node_name}"
add_qa_span_to_trace(parent_ctx, model, messages, raw_response, span_name)
# Parse response
node_result: dict[str, Any] = {
"node_name": node_name,
"raw_response": raw_response,
}
try:
parsed = parse_llm_json(raw_response)
node_result["tags"] = parsed.get("tags", [])
node_result["summary"] = parsed.get("summary", "")
node_result["score"] = parsed.get("call_quality_score")
node_result["overall_sentiment"] = parsed.get("overall_sentiment")
except (json.JSONDecodeError, ValueError):
node_result["tags"] = []
node_result["summary"] = ""
node_result["score"] = None
node_results[node_id] = node_result
# Append this node's conversation to running total
prior_conversation.extend(node_conversation)
result: dict[str, Any] = {
"node_results": node_results,
"model": model,
}
if total_token_usage:
result["token_usage"] = total_token_usage
return result
async def _run_whole_call_qa_analysis(
qa_node_data: dict[str, Any],
workflow_run: WorkflowRunModel,
workflow_run_id: int,
) -> dict[str, Any]:
"""Run whole-call QA analysis (fallback when events lack node_id).
Returns results wrapped in the per-node format for consistency.
"""
logs = workflow_run.logs or {}
rtf_events = logs.get("realtime_feedback_events", [])
if not rtf_events:
logger.warning(f"No realtime_feedback_events for run {workflow_run_id}")
return {"error": "no_transcript", "node_results": {}}
conversation = build_conversation_structure(rtf_events)
transcript = format_transcript(conversation)
if not transcript:
logger.warning(f"Empty transcript for run {workflow_run_id}")
return {"error": "empty_transcript", "node_results": {}}
# Compute call metrics
usage_info = workflow_run.usage_info or {}
call_duration = usage_info.get("call_duration_seconds")
metrics = compute_call_metrics(rtf_events, call_duration)
# Resolve LLM config
system_prompt = qa_node_data.get("qa_system_prompt", "")
if not system_prompt:
logger.warning("No system prompt defined for QA Node")
return {"error": "no_system_prompt", "node_results": {}}
model, api_key, base_url = await resolve_llm_config(qa_node_data, workflow_run)
if not api_key:
logger.warning(
f"No LLM API key configured for QA analysis on run {workflow_run_id}"
)
return {"error": "no_api_key", "node_results": {}}
# Build messages — substitute all placeholders with sensible defaults
template_context = {
"node_summary": "",
"previous_conversation_summary": "",
"transcript": transcript,
"metrics": json.dumps(metrics, indent=2),
}
system_content = render_template(system_prompt, template_context)
messages = [
{"role": "system", "content": system_content},
{"role": "user", "content": f"## Transcript\n{transcript}"},
]
# Call LLM
client_kwargs: dict[str, Any] = {"api_key": api_key}
if base_url:
client_kwargs["base_url"] = base_url
client = AsyncOpenAI(**client_kwargs)
try:
response = await client.chat.completions.create(
model=model,
messages=messages,
temperature=0,
)
raw_response = response.choices[0].message.content
except Exception as e:
logger.error(f"QA LLM call failed for run {workflow_run_id}: {e}")
return {"error": str(e), "node_results": {}}
# Extract token usage
token_usage: dict[str, int] = {}
accumulate_token_usage(token_usage, response)
# Parse response
node_result: dict[str, Any] = {
"node_name": "whole_call",
"raw_response": raw_response,
}
try:
parsed = parse_llm_json(raw_response)
node_result["tags"] = parsed.get("tags", [])
node_result["summary"] = parsed.get("summary", "")
node_result["score"] = parsed.get("call_quality_score")
node_result["overall_sentiment"] = parsed.get("overall_sentiment")
except (json.JSONDecodeError, ValueError):
node_result["tags"] = []
node_result["summary"] = ""
node_result["score"] = None
# Langfuse tracing
parent_ctx = setup_langfuse_parent_context(workflow_run)
add_qa_span_to_trace(parent_ctx, model, messages, raw_response, "qa-analysis")
result: dict[str, Any] = {
"node_results": {"whole_call": node_result},
"model": model,
}
if token_usage:
result["token_usage"] = token_usage
return result

View file

@ -0,0 +1,109 @@
"""Conversation building, transcript formatting, and per-node event splitting."""
from collections import OrderedDict
from datetime import datetime
from pipecat.utils.enums import RealtimeFeedbackType
def build_conversation_structure(logs: list[dict]) -> list[dict]:
"""Transform raw call logs into a conversation structure for LLM QA analysis."""
if not logs:
return []
start_time = datetime.fromisoformat(logs[0]["timestamp"])
conversation = []
for event in logs:
if event["type"] == RealtimeFeedbackType.BOT_TEXT.value:
speaker = "assistant"
utterance_text = event["payload"]["text"]
try:
event_time = datetime.fromisoformat(event["payload"]["timestamp"])
except KeyError:
event_time = datetime.fromisoformat(event["timestamp"])
elif event["type"] == RealtimeFeedbackType.USER_TRANSCRIPTION.value and event[
"payload"
].get("final", False):
speaker = "user"
utterance_text = event["payload"]["text"]
try:
event_time = datetime.fromisoformat(event["payload"]["timestamp"])
except KeyError:
event_time = datetime.fromisoformat(event["timestamp"])
elif event["type"] == RealtimeFeedbackType.FUNCTION_CALL_START.value:
speaker = "tool_call"
payload = event["payload"]
utterance_text = payload.get("function_name", "unknown")
event_time = datetime.fromisoformat(event["timestamp"])
else:
continue
time_from_start = (event_time - start_time).total_seconds()
conversation.append(
{
"time_from_start_seconds": round(time_from_start, 2),
"speaker": speaker,
"text": utterance_text,
"node_name": event.get("node_name", ""),
"turn": event.get("turn", 0),
}
)
return conversation
def format_transcript(conversation: list[dict]) -> str:
"""Format conversation structure into a readable transcript string for the LLM."""
lines = []
for entry in conversation:
if entry["speaker"] == "tool_call":
lines.append(
f"[{entry['time_from_start_seconds']:.1f}s] "
f"[tool_call]: {entry['text']}"
)
else:
lines.append(
f"[{entry['time_from_start_seconds']:.1f}s] "
f"{entry['speaker']}: {entry['text']}"
)
return "\n".join(lines)
def split_events_by_node(
rtf_events: list[dict],
) -> list[tuple[str, str, list[dict]]]:
"""Split realtime_feedback_events by node_id.
Returns an ordered list of (node_id, node_name, events) tuples.
Only includes nodes that have conversational content (BOT_TEXT or USER_TRANSCRIPTION).
"""
conversational_types = {
RealtimeFeedbackType.BOT_TEXT.value,
RealtimeFeedbackType.USER_TRANSCRIPTION.value,
}
# Preserve insertion order — first occurrence defines position
node_events: OrderedDict[str, list[dict]] = OrderedDict()
node_names: dict[str, str] = {}
for event in rtf_events:
node_id = event.get("node_id")
if not node_id:
return [] # Events lack node_id — caller should fall back
if node_id not in node_events:
node_events[node_id] = []
node_names[node_id] = event.get("node_name", "")
node_events[node_id].append(event)
# Filter to nodes with conversational content
result = []
for node_id, events in node_events.items():
has_conversation = any(e["type"] in conversational_types for e in events)
if has_conversation:
result.append((node_id, node_names[node_id], events))
return result

View file

@ -0,0 +1,98 @@
"""LLM configuration resolution and token usage accumulation."""
from api.db import db_client
from api.db.models import WorkflowRunModel
def _provider_base_url(provider: str | None, endpoint: str = "") -> str | None:
"""Return the base URL for a given LLM provider."""
if provider == "openrouter":
return "https://openrouter.ai/api/v1"
if provider == "groq":
return "https://api.groq.com/openai/v1"
if provider == "google":
return "https://generativelanguage.googleapis.com/v1beta/openai/"
if provider == "azure":
return endpoint or None
return None
async def resolve_llm_config(
qa_node_data: dict, workflow_run: WorkflowRunModel
) -> tuple[str, str, str | None]:
"""Resolve the LLM model, API key, and base URL for QA analysis.
If the QA node has its own LLM configuration (qa_use_workflow_llm=False),
use those settings directly. Otherwise, fall back to the user's configured LLM.
Returns:
(model, api_key, base_url) tuple
"""
if not qa_node_data.get("qa_use_workflow_llm", True):
return (
qa_node_data.get("qa_model"),
qa_node_data.get("qa_api_key"),
_provider_base_url(
qa_node_data.get("qa_provider"),
qa_node_data.get("qa_endpoint", ""),
),
)
# Fall back to user's configured LLM
model, api_key, base_url = await resolve_user_llm_config(workflow_run)
qa_model = qa_node_data.get("qa_model", "default")
if qa_model and qa_model != "default":
model = qa_model
return model, api_key, base_url
async def resolve_user_llm_config(
workflow_run: WorkflowRunModel,
) -> tuple[str, str, str | None]:
"""Resolve the user's configured LLM (from UserConfiguration).
Returns:
(model, api_key, base_url) tuple
"""
user_id = None
if workflow_run.workflow and workflow_run.workflow.user:
user_id = workflow_run.workflow.user.id
llm_config: dict = {}
if user_id:
user_configuration = await db_client.get_user_configurations(user_id)
llm_config = user_configuration.model_dump(exclude_none=True).get("llm", {})
provider = llm_config.get("provider", "openai")
api_key = llm_config.get("api_key", "")
model = llm_config.get("model", "gpt-4.1")
base_url = _provider_base_url(provider, llm_config.get("endpoint", ""))
if provider == "openrouter" and llm_config.get("base_url"):
base_url = llm_config["base_url"]
return model, api_key, base_url
def accumulate_token_usage(total: dict, response) -> None:
"""Add token counts from an LLM response to the running total dict."""
if not response.usage:
return
total["prompt_tokens"] = total.get("prompt_tokens", 0) + (
response.usage.prompt_tokens or 0
)
total["completion_tokens"] = total.get("completion_tokens", 0) + (
response.usage.completion_tokens or 0
)
total["total_tokens"] = total.get("total_tokens", 0) + (
response.usage.total_tokens or 0
)
total["cache_read_input_tokens"] = total.get("cache_read_input_tokens", 0) + (
getattr(response.usage, "cache_read_input_tokens", 0) or 0
)
cache_creation = getattr(response.usage, "cache_creation_input_tokens", None)
if cache_creation is not None:
total["cache_creation_input_tokens"] = (
total.get("cache_creation_input_tokens") or 0
) + cache_creation

View file

@ -0,0 +1,37 @@
"""Call metrics computation from raw event logs."""
from pipecat.utils.enums import RealtimeFeedbackType
def compute_call_metrics(
logs: list[dict], call_duration_seconds: float | None = None
) -> dict:
"""Pre-compute quantitative metrics from raw call logs."""
latencies = []
ttfb_values = []
for event in logs:
if event["type"] == RealtimeFeedbackType.LATENCY_MEASURED.value:
latencies.append(event["payload"]["latency_seconds"])
elif event["type"] == RealtimeFeedbackType.TTFB_METRIC.value:
ttfb_values.append(event["payload"]["ttfb_seconds"])
turns = set()
for event in logs:
if event["type"] in (
RealtimeFeedbackType.USER_TRANSCRIPTION.value,
RealtimeFeedbackType.BOT_TEXT.value,
):
turns.add(event.get("turn", 0))
return {
"call_duration_seconds": call_duration_seconds,
"num_turns": len(turns),
"avg_latency_seconds": (
round(sum(latencies) / len(latencies), 2) if latencies else None
),
"avg_ttfb_seconds": (
round(sum(ttfb_values) / len(ttfb_values), 2) if ttfb_values else None
),
"max_latency_seconds": round(max(latencies), 2) if latencies else None,
}

View file

@ -0,0 +1,184 @@
"""Node summary generation and caching for per-node QA analysis."""
from typing import Any
from loguru import logger
from openai import AsyncOpenAI
from api.db import db_client
from api.db.models import WorkflowRunModel
from api.services.workflow.dto import NodeType
from api.services.workflow.qa.llm_config import resolve_llm_config
from api.services.workflow.qa.tracing import create_node_summary_trace
NODE_SUMMARY_SYSTEM_PROMPT = (
"You are analyzing a voice AI agent script. This is only a part of a larger script. "
"Produce a concise summary (2-4 sentences) describing this script purpose, "
"what the agent should accomplish, and key behaviors. We will be using this "
"summary to do a QA on the conversation that the agent would do with someone "
"so try to capture the nuances of the script as much as possible."
)
CONVERSATION_SUMMARY_SYSTEM_PROMPT = (
"You are summarizing a portion of a voice AI conversation. "
"Produce a concise summary (3-5 sentences) covering key topics, "
"information exchanged, and current state. We would be using this "
"summary in doing a QA of the conversation that the voice AI agent "
"did with someone so try to capture the nuances of the conversation "
"as much as possible."
)
def get_node_summary_text(node_summaries: dict, node_id: str) -> str:
"""Extract the summary text from a node_summaries entry.
Handles both the current format (dict with "summary" key) and the
legacy format (plain string) for backward compatibility.
"""
entry = node_summaries.get(node_id)
if entry is None:
return ""
if isinstance(entry, str):
return entry
return entry.get("summary", "")
async def ensure_node_summaries(
workflow_definition: dict,
definition_id: int | None,
workflow_run: WorkflowRunModel,
qa_node_data: dict,
) -> dict[str, Any]:
"""Ensure every agentNode/startCall node has a summary in the definition.
Returns the node_summaries dict:
{node_id: {"summary": "...", "trace_url": "..."}, ...}
"""
existing_summaries: dict[str, Any] = workflow_definition.get("node_summaries", {})
nodes = workflow_definition.get("nodes", [])
summarizable_types = {NodeType.agentNode.value, NodeType.startNode.value}
nodes_needing_summary = [
n
for n in nodes
if n.get("type") in summarizable_types and n.get("id") not in existing_summaries
]
if not nodes_needing_summary:
return existing_summaries
model, api_key, base_url = await resolve_llm_config(qa_node_data, workflow_run)
if not api_key:
logger.warning("No API key for node summary generation, skipping")
return existing_summaries
client_kwargs: dict[str, Any] = {"api_key": api_key}
if base_url:
client_kwargs["base_url"] = base_url
client = AsyncOpenAI(**client_kwargs)
updated_summaries = dict(existing_summaries)
# Collect all tool UUIDs across nodes and fetch them in one query
all_tool_uuids: set[str] = set()
for node in nodes_needing_summary:
node_data = node.get("data", {})
for uuid in node_data.get("tool_uuids", []):
all_tool_uuids.add(uuid)
tool_map: dict[str, Any] = {}
if all_tool_uuids:
organization_id = (
workflow_run.workflow.organization_id if workflow_run.workflow else None
)
if organization_id:
try:
tools = await db_client.get_tools_by_uuids(
list(all_tool_uuids), organization_id
)
for t in tools:
tool_map[t.tool_uuid] = {
"name": t.name,
"description": t.description or "",
}
except Exception as e:
logger.warning(f"Failed to fetch tools for node summaries: {e}")
# Build a map of outgoing edges per node (edges are also tool calls)
edges = workflow_definition.get("edges", [])
outgoing_edges_by_node: dict[str, list[dict]] = {}
for edge in edges:
source = edge.get("source")
if source:
outgoing_edges_by_node.setdefault(source, []).append(edge)
for node in nodes_needing_summary:
node_id = node["id"]
node_data = node.get("data", {})
node_name = node_data.get("name", "Unnamed")
# Build a description of the node for the LLM
node_info_parts = [f"Node name: {node_name}"]
if node_data.get("prompt"):
node_info_parts.append(f"Agent prompt:\n{node_data['prompt']}")
# Collect all available tools: custom tools + outgoing edges
tool_descriptions = []
node_tool_uuids = node_data.get("tool_uuids", [])
for uuid in node_tool_uuids:
tool_info = tool_map.get(uuid)
if tool_info:
desc = f"- {tool_info['name']}"
if tool_info["description"]:
desc += f": {tool_info['description']}"
tool_descriptions.append(desc)
for edge in outgoing_edges_by_node.get(node_id, []):
edge_data = edge.get("data", {})
label = edge_data.get("label", "")
condition = edge_data.get("condition", "")
if label:
desc = f"- {label}"
if condition:
desc += f": {condition}"
tool_descriptions.append(desc)
if tool_descriptions:
node_info_parts.append("Available tools:\n" + "\n".join(tool_descriptions))
node_info = "\n".join(node_info_parts)
messages = [
{"role": "system", "content": NODE_SUMMARY_SYSTEM_PROMPT},
{"role": "user", "content": node_info},
]
try:
response = await client.chat.completions.create(
model=model,
messages=messages,
temperature=0,
)
summary_text = response.choices[0].message.content or ""
except Exception as e:
logger.warning(f"Failed to generate summary for node {node_id}: {e}")
updated_summaries[node_id] = {"summary": ""}
continue
# Create a Langfuse trace for this summary generation
trace_url = create_node_summary_trace(model, messages, summary_text, node_name)
entry: dict[str, Any] = {"summary": summary_text}
if trace_url:
entry["trace_url"] = trace_url
updated_summaries[node_id] = entry
# Persist to DB
if definition_id and updated_summaries != existing_summaries:
try:
await db_client.update_definition_node_summaries(
definition_id, updated_summaries
)
except Exception as e:
logger.warning(f"Failed to persist node summaries: {e}")
return updated_summaries

View file

@ -0,0 +1,154 @@
"""Langfuse / OpenTelemetry tracing helpers for QA analysis."""
import re
from loguru import logger
from api.db.models import WorkflowRunModel
def extract_trace_id(gathered_context: dict) -> str | None:
"""Extract Langfuse trace_id from gathered_context trace_url.
URL format: https://langfuse.dograh.com/project/<project_id>/traces/<trace_id>
"""
trace_url = gathered_context.get("trace_url")
if not trace_url:
return None
try:
match = re.search(r"/traces/([a-fA-F0-9]+)$", trace_url)
if match:
return match.group(1)
except Exception:
pass
return None
def setup_langfuse_parent_context(workflow_run: WorkflowRunModel):
"""Set up OTEL parent context from the workflow run's Langfuse trace.
Returns the parent context object, or None if tracing is unavailable.
"""
try:
from opentelemetry.trace import (
NonRecordingSpan,
SpanContext,
TraceFlags,
set_span_in_context,
)
from api.services.pipecat.tracing_config import (
is_tracing_enabled,
setup_tracing_exporter,
)
if not is_tracing_enabled():
return None
setup_tracing_exporter()
gathered_context = workflow_run.gathered_context or {}
trace_id = extract_trace_id(gathered_context)
if not trace_id:
logger.debug("No trace_id found, skipping Langfuse tracing")
return None
parent_span_ctx = SpanContext(
trace_id=int(trace_id, 16),
span_id=0x1,
is_remote=True,
trace_flags=TraceFlags(0x01),
)
return set_span_in_context(NonRecordingSpan(parent_span_ctx))
except Exception as e:
logger.warning(f"Failed to set up Langfuse parent context: {e}")
return None
def add_qa_span_to_trace(
parent_ctx,
model: str,
messages: list[dict],
output: str,
span_name: str,
) -> None:
"""Create a child span under the conversation trace."""
if parent_ctx is None:
return
try:
from opentelemetry import trace as otel_trace
from pipecat.utils.tracing.service_attributes import add_llm_span_attributes
tracer = otel_trace.get_tracer("pipecat")
with tracer.start_as_current_span(
span_name,
context=parent_ctx,
) as span:
add_llm_span_attributes(
span,
service_name="OpenAILLMService",
model=model,
operation_name=span_name,
messages=messages,
output=output,
stream=False,
parameters={"temperature": 0},
)
except Exception as e:
logger.warning(f"Failed to trace span '{span_name}' to Langfuse: {e}")
def create_node_summary_trace(
model: str,
messages: list[dict],
output: str,
node_name: str,
) -> str | None:
"""Create a standalone Langfuse trace for a node summary generation.
Returns the trace URL, or None if tracing is unavailable.
"""
try:
from opentelemetry import trace as otel_trace
from opentelemetry.context import Context
from api.services.pipecat.tracing_config import (
is_tracing_enabled,
setup_tracing_exporter,
)
from pipecat.utils.tracing.service_attributes import add_llm_span_attributes
if not is_tracing_enabled():
return None
setup_tracing_exporter()
tracer = otel_trace.get_tracer("pipecat")
# Create a root span (new trace) for this node summary generation
with tracer.start_as_current_span(
f"node-summary-{node_name}",
context=Context(),
) as span:
add_llm_span_attributes(
span,
service_name="OpenAILLMService",
model=model,
operation_name=f"node-summary-{node_name}",
messages=messages,
output=output,
stream=False,
parameters={"temperature": 0},
)
trace_id = format(span.get_span_context().trace_id, "032x")
from langfuse import get_client
langfuse = get_client()
return langfuse.get_trace_url(trace_id=trace_id)
except Exception as e:
logger.warning(f"Failed to create node summary trace for '{node_name}': {e}")
return None

View file

@ -9,7 +9,7 @@ from loguru import logger
from api.constants import BACKEND_API_ENDPOINT
from api.db import db_client
from api.db.models import WorkflowRunModel
from api.services.qa_analysis import run_qa_analysis
from api.services.workflow.qa import run_per_node_qa_analysis
from api.utils.credential_auth import build_auth_header
from api.utils.template_renderer import render_template
from pipecat.utils.enums import EndTaskReason
@ -53,6 +53,8 @@ async def _run_qa_nodes(
qa_nodes: list[dict],
workflow_run: WorkflowRunModel,
workflow_run_id: int,
workflow_definition: dict,
definition_id: int | None,
) -> Dict[str, Any]:
"""Run QA analysis for each enabled QA node and aggregate results.
@ -78,16 +80,33 @@ async def _run_qa_nodes(
try:
logger.info(f"Running QA analysis for node '{node_name}' (#{node_id})")
result = await run_qa_analysis(node_data, workflow_run, workflow_run_id)
result = await run_per_node_qa_analysis(
node_data,
workflow_run,
workflow_run_id,
workflow_definition,
definition_id,
)
results[f"qa_{node_id}"] = result
# Log summary from node_results
node_results = result.get("node_results", {})
logger.info(
f"QA analysis complete for '{node_name}': "
f"score={result.get('score')}, tags={len(result.get('tags', []))}"
f"{len(node_results)} nodes analyzed"
)
except Exception as e:
logger.error(f"QA analysis failed for node '{node_name}': {e}")
results[f"qa_{node_id}"] = {"error": str(e)}
# Collect all unique tags across all QA node results for top-level filtering
all_tags: set[str] = set()
for result in results.values():
for node_result in result.get("node_results", {}).values():
if isinstance(node_result, dict):
all_tags.update(node_result.get("tags", []))
if all_tags:
results["tags"] = sorted(all_tags)
return results
@ -159,8 +178,16 @@ async def run_integrations_post_workflow_run(_ctx, workflow_run_id: int):
logger.warning("No organization found, skipping integrations")
return
# Step 2: Get workflow definition
workflow_definition = workflow_run.workflow.workflow_definition_with_fallback
# Step 2: Get workflow definition (prefer the run-specific definition)
if workflow_run.definition:
workflow_definition = workflow_run.definition.workflow_json
definition_id = workflow_run.definition.id
else:
workflow_definition = (
workflow_run.workflow.workflow_definition_with_fallback
)
definition_id = workflow_run.workflow.current_definition_id
if not workflow_definition:
logger.debug("No workflow definition, skipping integrations")
return
@ -183,7 +210,13 @@ async def run_integrations_post_workflow_run(_ctx, workflow_run_id: int):
# Step 5: Run QA analysis before webhooks
if qa_nodes:
logger.info(f"Found {len(qa_nodes)} QA nodes to execute")
qa_results = await _run_qa_nodes(qa_nodes, workflow_run, workflow_run_id)
qa_results = await _run_qa_nodes(
qa_nodes,
workflow_run,
workflow_run_id,
workflow_definition,
definition_id,
)
if qa_results:
await db_client.update_workflow_run(

View file

@ -22,7 +22,13 @@ import logger from '@/lib/logger';
import { getNextNodeId, getRandomId } from "@/lib/utils";
import { DEFAULT_WORKFLOW_CONFIGURATIONS, WorkflowConfigurations } from "@/types/workflow-configurations";
const DEFAULT_QA_SYSTEM_PROMPT = `You are a QA expert analyzing voice AI call transcripts. Analyze the conversation and return a structured JSON assessment.
const DEFAULT_QA_SYSTEM_PROMPT = `You are a QA analyst evaluating a specific segment of a voice AI conversation.
## Node Purpose
{{node_summary}}
## Previous Conversation Context (For start of conversation, previous conversation summary can be empty.)
{{previous_conversation_summary}}
## Tags to evaluate
@ -42,7 +48,7 @@ Examine the conversation carefully and identify which of the following tags appl
## Call metrics (pre-computed)
Use these alongside the transcript for your analysis:
{metrics}
{{metrics}}
## Output format
@ -56,7 +62,7 @@ Return ONLY a valid JSON object (no markdown):
],
"overall_sentiment": "positive|neutral|negative",
"call_quality_score": <1-10>,
"summary": "1-2 sentence summary of the call"
"summary": "1-2 sentence summary of this segment"
}
If no tags apply, return an empty tags list. Always provide sentiment, score, and summary.`;

View file

@ -268,14 +268,16 @@ const QANodeEditForm = ({
<div className="grid gap-2">
<Label>System Prompt</Label>
<Label className="text-xs text-muted-foreground">
The prompt sent to the LLM for QA analysis. Use {'{metrics}'} placeholder for
call metrics.
The prompt sent to the LLM for per-node QA analysis. Available placeholders:{' '}
{'{{node_summary}}'} (purpose of the current node), {'{{previous_conversation_summary}}'}{' '}
(summary of conversation before this node), {'{{transcript}}'} (this node&apos;s
conversation), {'{{metrics}}'} (call metrics for this node).
</Label>
<Textarea
value={qaSystemPrompt}
onChange={(e) => setQaSystemPrompt(e.target.value)}
className="min-h-[300px] font-mono text-xs"
placeholder="Enter QA analysis system prompt..."
placeholder={`You are a QA analyst evaluating a specific segment of a voice AI conversation.\n\n## Node Purpose\n{{node_summary}}\n\n## Previous Conversation Context\n{{previous_conversation_summary}}\n\n## Call Metrics\n{{metrics}}\n\nEvaluate the transcript and return JSON with:\n- "tags": array of relevant tags\n- "summary": 2-3 sentence summary of this segment\n- "call_quality_score": number 1-10\n- "overall_sentiment": "positive", "neutral", or "negative"`}
/>
</div>