From c8742dbdc0679a410a216c80d92437e4f071f3fa Mon Sep 17 00:00:00 2001 From: Abhishek Kumar Date: Wed, 25 Feb 2026 17:17:48 +0530 Subject: [PATCH] feat: run per node QA --- api/db/workflow_client.py | 30 ++ api/db/workflow_run_client.py | 3 +- api/services/qa_analysis.py | 360 ------------------ api/services/workflow/qa/__init__.py | 12 + api/services/workflow/qa/analysis.py | 333 ++++++++++++++++ api/services/workflow/qa/conversation.py | 109 ++++++ api/services/workflow/qa/llm_config.py | 98 +++++ api/services/workflow/qa/metrics.py | 37 ++ api/services/workflow/qa/node_summary.py | 184 +++++++++ api/services/workflow/qa/tracing.py | 154 ++++++++ api/tasks/run_integrations.py | 45 ++- .../[workflowId]/hooks/useWorkflowState.ts | 12 +- ui/src/components/flow/nodes/QANode.tsx | 8 +- 13 files changed, 1012 insertions(+), 373 deletions(-) delete mode 100644 api/services/qa_analysis.py create mode 100644 api/services/workflow/qa/__init__.py create mode 100644 api/services/workflow/qa/analysis.py create mode 100644 api/services/workflow/qa/conversation.py create mode 100644 api/services/workflow/qa/llm_config.py create mode 100644 api/services/workflow/qa/metrics.py create mode 100644 api/services/workflow/qa/node_summary.py create mode 100644 api/services/workflow/qa/tracing.py diff --git a/api/db/workflow_client.py b/api/db/workflow_client.py index b582bd1..804b0e6 100644 --- a/api/db/workflow_client.py +++ b/api/db/workflow_client.py @@ -374,6 +374,36 @@ class WorkflowClient(BaseDBClient): ) return result.scalar() or 0 + async def update_definition_node_summaries( + self, definition_id: int, node_summaries: dict + ) -> None: + """Update the node_summaries field within a workflow definition's workflow_json. + + Args: + definition_id: The ID of the WorkflowDefinitionModel to update + node_summaries: Dict mapping node_id to summary data + (e.g. {"summary": "...", "trace_url": "..."}) + """ + async with self.async_session() as session: + result = await session.execute( + select(WorkflowDefinitionModel).where( + WorkflowDefinitionModel.id == definition_id + ) + ) + definition = result.scalars().first() + if not definition: + return + + workflow_json = dict(definition.workflow_json) + workflow_json["node_summaries"] = node_summaries + definition.workflow_json = workflow_json + + try: + await session.commit() + except Exception as e: + await session.rollback() + raise e + async def get_workflow_run_counts(self, workflow_ids: list[int]) -> dict[int, int]: """Get run counts for multiple workflows in a single query. diff --git a/api/db/workflow_run_client.py b/api/db/workflow_run_client.py index 9d3dc86..85fe5c3 100644 --- a/api/db/workflow_run_client.py +++ b/api/db/workflow_run_client.py @@ -376,10 +376,11 @@ class WorkflowRunClient(BaseDBClient): result = await session.execute( select(WorkflowRunModel) .options( + selectinload(WorkflowRunModel.definition), selectinload(WorkflowRunModel.workflow).options( selectinload(WorkflowModel.user), selectinload(WorkflowModel.current_definition), - ) + ), ) .where(WorkflowRunModel.id == workflow_run_id) ) diff --git a/api/services/qa_analysis.py b/api/services/qa_analysis.py deleted file mode 100644 index b288ff1..0000000 --- a/api/services/qa_analysis.py +++ /dev/null @@ -1,360 +0,0 @@ -"""QA analysis service for post-call quality assessment. - -Runs LLM-based analysis on call transcripts, traces under the same -Langfuse trace as the conversation, and returns structured results. -""" - -import json -import re -from datetime import datetime -from typing import Any - -from loguru import logger -from openai import AsyncOpenAI - -from api.db import db_client -from api.db.models import WorkflowRunModel -from api.services.gen_ai.json_parser import parse_llm_json -from pipecat.utils.enums import RealtimeFeedbackType - - -def build_conversation_structure(logs: list[dict]) -> list[dict]: - """Transform raw call logs into a conversation structure for LLM QA analysis.""" - if not logs: - return [] - - start_time = datetime.fromisoformat(logs[0]["timestamp"]) - - conversation = [] - for event in logs: - if event["type"] == RealtimeFeedbackType.BOT_TEXT.value: - speaker = "assistant" - utterance_text = event["payload"]["text"] - event_time = datetime.fromisoformat(event["payload"]["timestamp"]) - elif event["type"] == RealtimeFeedbackType.USER_TRANSCRIPTION.value and event[ - "payload" - ].get("final", False): - speaker = "user" - utterance_text = event["payload"]["text"] - event_time = datetime.fromisoformat(event["payload"]["timestamp"]) - else: - continue - - time_from_start = (event_time - start_time).total_seconds() - - conversation.append( - { - "time_from_start_seconds": round(time_from_start, 2), - "speaker": speaker, - "text": utterance_text, - "node_name": event.get("node_name", ""), - "turn": event.get("turn", 0), - } - ) - - return conversation - - -def format_transcript(conversation: list[dict]) -> str: - """Format conversation structure into a readable transcript string for the LLM.""" - lines = [] - for entry in conversation: - lines.append( - f"[{entry['time_from_start_seconds']:.1f}s] " - f"{entry['speaker']}: {entry['text']}" - ) - return "\n".join(lines) - - -def compute_call_metrics( - logs: list[dict], call_duration_seconds: float | None = None -) -> dict: - """Pre-compute quantitative metrics from raw call logs.""" - latencies = [] - ttfb_values = [] - - for event in logs: - if event["type"] == RealtimeFeedbackType.LATENCY_MEASURED.value: - latencies.append(event["payload"]["latency_seconds"]) - elif event["type"] == RealtimeFeedbackType.TTFB_METRIC.value: - ttfb_values.append(event["payload"]["ttfb_seconds"]) - - turns = set() - for event in logs: - if event["type"] in ( - RealtimeFeedbackType.USER_TRANSCRIPTION.value, - RealtimeFeedbackType.BOT_TEXT.value, - ): - turns.add(event.get("turn", 0)) - - return { - "call_duration_seconds": call_duration_seconds, - "num_turns": len(turns), - "avg_latency_seconds": ( - round(sum(latencies) / len(latencies), 2) if latencies else None - ), - "avg_ttfb_seconds": ( - round(sum(ttfb_values) / len(ttfb_values), 2) if ttfb_values else None - ), - "max_latency_seconds": round(max(latencies), 2) if latencies else None, - } - - -def _extract_trace_id(gathered_context: dict) -> str | None: - """Extract Langfuse trace_id from gathered_context trace_url. - - URL format: https://langfuse.dograh.com/project//traces/ - """ - trace_url = gathered_context.get("trace_url") - if not trace_url: - return None - try: - match = re.search(r"/traces/([a-fA-F0-9]+)$", trace_url) - if match: - return match.group(1) - except Exception: - pass - return None - - -def _provider_base_url(provider: str | None, endpoint: str = "") -> str | None: - """Return the base URL for a given LLM provider.""" - if provider == "openrouter": - return "https://openrouter.ai/api/v1" - if provider == "groq": - return "https://api.groq.com/openai/v1" - if provider == "google": - return "https://generativelanguage.googleapis.com/v1beta/openai/" - if provider == "azure": - return endpoint or None - return None - - -async def _resolve_llm_config( - qa_node_data: dict, workflow_run: WorkflowRunModel -) -> tuple[str, str, str | None]: - """Resolve the LLM model, API key, and base URL for QA analysis. - - If the QA node has its own LLM configuration (qa_use_workflow_llm=False), - use those settings directly. Otherwise, fall back to the user's configured LLM. - - Returns: - (model, api_key, base_url) tuple - """ - if not qa_node_data.get("qa_use_workflow_llm", True): - return ( - qa_node_data.get("qa_model"), - qa_node_data.get("qa_api_key"), - _provider_base_url( - qa_node_data.get("qa_provider"), - qa_node_data.get("qa_endpoint", ""), - ), - ) - - # Fall back to user's configured LLM - user_id = None - if workflow_run.workflow and workflow_run.workflow.user: - user_id = workflow_run.workflow.user.id - - llm_config: dict = {} - if user_id: - user_configuration = await db_client.get_user_configurations(user_id) - llm_config = user_configuration.model_dump(exclude_none=True).get("llm", {}) - - provider = llm_config.get("provider", "openai") - api_key = llm_config.get("api_key", "") - - qa_model = qa_node_data.get("qa_model", "default") - if qa_model and qa_model != "default": - model = qa_model - else: - model = llm_config.get("model", "gpt-4.1") - - base_url = _provider_base_url(provider, llm_config.get("endpoint", "")) - # For openrouter, prefer user-configured base_url if set - if provider == "openrouter" and llm_config.get("base_url"): - base_url = llm_config["base_url"] - - return model, api_key, base_url - - -async def run_qa_analysis( - qa_node_data: dict[str, Any], - workflow_run: WorkflowRunModel, - workflow_run_id: int, -) -> dict[str, Any]: - """Run QA analysis on a completed workflow run. - - Args: - qa_node_data: The QA node's data dict from workflow definition - workflow_run: The workflow run model with logs and context - workflow_run_id: The workflow run ID - - Returns: - Dict with tags, summary, score, raw_response - """ - # Extract transcript from logs - logs = workflow_run.logs or {} - rtf_events = logs.get("realtime_feedback_events", []) - if not rtf_events: - logger.warning(f"No realtime_feedback_events for run {workflow_run_id}") - return {"error": "no_transcript", "tags": [], "summary": "", "score": None} - - conversation = build_conversation_structure(rtf_events) - transcript = format_transcript(conversation) - if not transcript: - logger.warning(f"Empty transcript for run {workflow_run_id}") - return {"error": "empty_transcript", "tags": [], "summary": "", "score": None} - - # Compute call metrics - usage_info = workflow_run.usage_info or {} - call_duration = usage_info.get("call_duration_seconds") - metrics = compute_call_metrics(rtf_events, call_duration) - - # Resolve LLM config - system_prompt = qa_node_data.get("qa_system_prompt", "") - if not system_prompt: - logger.warning("No system prompt defined for QA Node") - return {"error": "no_system_prompt", "tags": [], "summary": "", "score": None} - - model, api_key, base_url = await _resolve_llm_config(qa_node_data, workflow_run) - - if not api_key: - logger.warning( - f"No LLM API key configured for QA analysis on run {workflow_run_id}" - ) - return {"error": "no_api_key", "tags": [], "summary": "", "score": None} - - # Build messages - system_content = system_prompt.replace("{metrics}", json.dumps(metrics, indent=2)) - messages = [ - {"role": "system", "content": system_content}, - {"role": "user", "content": f"## Transcript\n{transcript}"}, - ] - - # Call LLM - client_kwargs: dict[str, Any] = {"api_key": api_key} - if base_url: - client_kwargs["base_url"] = base_url - - client = AsyncOpenAI(**client_kwargs) - - try: - response = await client.chat.completions.create( - model=model, - messages=messages, - temperature=0, - ) - raw_response = response.choices[0].message.content - except Exception as e: - logger.error(f"QA LLM call failed for run {workflow_run_id}: {e}") - return {"error": str(e), "tags": [], "summary": "", "score": None} - - # Extract token usage from LLM response - token_usage = None - if response.usage: - token_usage = { - "prompt_tokens": response.usage.prompt_tokens or 0, - "completion_tokens": response.usage.completion_tokens or 0, - "total_tokens": response.usage.total_tokens or 0, - "cache_read_input_tokens": getattr( - response.usage, "cache_read_input_tokens", 0 - ) - or 0, - "cache_creation_input_tokens": getattr( - response.usage, "cache_creation_input_tokens", None - ), - } - - # Parse response - result: dict[str, Any] = {"raw_response": raw_response, "model": model} - if token_usage: - result["token_usage"] = token_usage - try: - parsed = parse_llm_json(raw_response) - result["tags"] = parsed.get("tags", []) - result["summary"] = parsed.get("summary", "") - result["score"] = parsed.get("call_quality_score") - result["overall_sentiment"] = parsed.get("overall_sentiment") - except (json.JSONDecodeError, ValueError): - result["tags"] = [] - result["summary"] = "" - result["score"] = None - - # Langfuse tracing — attach QA generation to the conversation trace - _add_qa_span_to_conversation_trace( - workflow_run, model, messages, raw_response, result - ) - - return result - - -def _add_qa_span_to_conversation_trace( - workflow_run: WorkflowRunModel, - model: str, - messages: list[dict], - raw_response: str, - result: dict, -): - """Attach the QA generation to the existing Langfuse conversation trace. - - Uses OpenTelemetry directly to create a child span under the existing trace, - matching the same attribute format used by the pipecat pipeline (gen_ai.*). - """ - try: - from opentelemetry import trace as otel_trace - from opentelemetry.trace import ( - NonRecordingSpan, - SpanContext, - TraceFlags, - set_span_in_context, - ) - - from api.services.pipecat.tracing_config import ( - is_tracing_enabled, - setup_tracing_exporter, - ) - from pipecat.utils.tracing.service_attributes import add_llm_span_attributes - - if not is_tracing_enabled(): - return - - # Ensure the OTEL exporter is initialized (idempotent — no-op if - # already called in the pipeline process, required in the ARQ worker). - setup_tracing_exporter() - - gathered_context = workflow_run.gathered_context or {} - trace_id = _extract_trace_id(gathered_context) - if not trace_id: - logger.debug("No trace_id found, skipping Langfuse QA trace") - return - - tracer = otel_trace.get_tracer("pipecat") - - # Create a remote parent context from the existing trace ID - parent_span_ctx = SpanContext( - trace_id=int(trace_id, 16), - span_id=0x1, # dummy parent span id - is_remote=True, - trace_flags=TraceFlags(0x01), - ) - parent_ctx = set_span_in_context(NonRecordingSpan(parent_span_ctx)) - - # Create a child span under the existing trace - with tracer.start_as_current_span( - "qa-analysis", - context=parent_ctx, - ) as span: - add_llm_span_attributes( - span, - service_name="OpenAILLMService", - model=model, - operation_name="qa-analysis", - messages=messages, - output=raw_response, - stream=False, - parameters={"temperature": 0}, - ) - - except Exception as e: - logger.warning(f"Failed to trace QA to Langfuse: {e}") diff --git a/api/services/workflow/qa/__init__.py b/api/services/workflow/qa/__init__.py new file mode 100644 index 0000000..209d5b5 --- /dev/null +++ b/api/services/workflow/qa/__init__.py @@ -0,0 +1,12 @@ +"""QA analysis service for post-call quality assessment. + +Runs LLM-based analysis on call transcripts, traces under the same +Langfuse trace as the conversation, and returns structured results. + +Supports per-node QA analysis where each agent/start node gets its own +evaluation with node purpose summary and prior conversation context. +""" + +from api.services.workflow.qa.analysis import run_per_node_qa_analysis + +__all__ = ["run_per_node_qa_analysis"] diff --git a/api/services/workflow/qa/analysis.py b/api/services/workflow/qa/analysis.py new file mode 100644 index 0000000..b2fe335 --- /dev/null +++ b/api/services/workflow/qa/analysis.py @@ -0,0 +1,333 @@ +"""Main QA analysis orchestrator — per-node and whole-call fallback.""" + +import json +from typing import Any + +from loguru import logger +from openai import AsyncOpenAI + +from api.db.models import WorkflowRunModel +from api.services.gen_ai.json_parser import parse_llm_json +from api.services.workflow.qa.conversation import ( + build_conversation_structure, + format_transcript, + split_events_by_node, +) +from api.services.workflow.qa.llm_config import ( + accumulate_token_usage, + resolve_llm_config, +) +from api.services.workflow.qa.metrics import compute_call_metrics +from api.services.workflow.qa.node_summary import ( + CONVERSATION_SUMMARY_SYSTEM_PROMPT, + ensure_node_summaries, + get_node_summary_text, +) +from api.services.workflow.qa.tracing import ( + add_qa_span_to_trace, + setup_langfuse_parent_context, +) +from api.utils.template_renderer import render_template + + +async def _generate_conversation_summary( + client: AsyncOpenAI, + model: str, + transcript: str, + parent_ctx, + node_name: str, + total_token_usage: dict, +) -> str: + """Generate a summary of the conversation so far (before the current node). + + Traced to Langfuse as conversation-summary-before-{node_name}. + """ + messages = [ + {"role": "system", "content": CONVERSATION_SUMMARY_SYSTEM_PROMPT}, + {"role": "user", "content": f"## Conversation\n{transcript}"}, + ] + + try: + response = await client.chat.completions.create( + model=model, + messages=messages, + temperature=0, + ) + summary = response.choices[0].message.content or "" + accumulate_token_usage(total_token_usage, response) + + span_name = f"conversation-summary-before-{node_name}" + add_qa_span_to_trace(parent_ctx, model, messages, summary, span_name) + + return summary + except Exception as e: + logger.warning( + f"Failed to generate conversation summary before {node_name}: {e}" + ) + return "" + + +async def run_per_node_qa_analysis( + qa_node_data: dict[str, Any], + workflow_run: WorkflowRunModel, + workflow_run_id: int, + workflow_definition: dict, + definition_id: int | None, +) -> dict[str, Any]: + """Run per-node QA analysis on a completed workflow run. + + Splits the call by node, generates per-node summaries and conversation + context, then evaluates each node segment individually. + + Falls back to whole-call QA if events lack node_id. + + Returns: + Dict with node_results, token_usage, model + """ + logs = workflow_run.logs or {} + rtf_events = logs.get("realtime_feedback_events", []) + if not rtf_events: + logger.warning(f"No realtime_feedback_events for run {workflow_run_id}") + return {"error": "no_transcript", "node_results": {}} + + # Try to split by node + node_splits = split_events_by_node(rtf_events) + if not node_splits: + # Fall back to whole-call QA + logger.info( + f"Events lack node_id for run {workflow_run_id}, falling back to whole-call QA" + ) + return await _run_whole_call_qa_analysis( + qa_node_data, workflow_run, workflow_run_id + ) + + system_prompt = qa_node_data.get("qa_system_prompt", "") + if not system_prompt: + logger.warning("No system prompt defined for QA Node") + return {"error": "no_system_prompt", "node_results": {}} + + # Resolve LLM config + model, api_key, base_url = await resolve_llm_config(qa_node_data, workflow_run) + if not api_key: + logger.warning( + f"No LLM API key configured for QA analysis on run {workflow_run_id}" + ) + return {"error": "no_api_key", "node_results": {}} + + # Ensure node summaries + node_summaries = await ensure_node_summaries( + workflow_definition, definition_id, workflow_run, qa_node_data + ) + + # Set up Langfuse tracing + parent_ctx = setup_langfuse_parent_context(workflow_run) + + # Build LLM client + client_kwargs: dict[str, Any] = {"api_key": api_key} + if base_url: + client_kwargs["base_url"] = base_url + client = AsyncOpenAI(**client_kwargs) + + total_token_usage: dict[str, int] = {} + node_results: dict[str, Any] = {} + prior_conversation: list[dict] = [] # Running accumulation of all prior nodes + + for idx, (node_id, node_name, node_events) in enumerate(node_splits): + # Build this node's conversation and transcript + node_conversation = build_conversation_structure(node_events) + node_transcript = format_transcript(node_conversation) + if not node_transcript: + continue + + # Compute per-node metrics + node_metrics = compute_call_metrics(node_events) + + # Get node summary + node_summary = get_node_summary_text(node_summaries, node_id) + + # Generate conversation summary from prior nodes (if not first) + previous_conversation_summary = "" + if idx > 0 and prior_conversation: + prior_transcript = format_transcript(prior_conversation) + previous_conversation_summary = await _generate_conversation_summary( + client, + model, + prior_transcript, + parent_ctx, + node_name, + total_token_usage, + ) + + # Substitute placeholders in the user's system prompt + template_context = { + "node_summary": node_summary, + "previous_conversation_summary": previous_conversation_summary, + "transcript": node_transcript, + "metrics": json.dumps(node_metrics, indent=2), + } + system_content = render_template(system_prompt, template_context) + + messages = [ + {"role": "system", "content": system_content}, + {"role": "user", "content": f"## Transcript\n{node_transcript}"}, + ] + + # Call QA LLM + try: + response = await client.chat.completions.create( + model=model, + messages=messages, + temperature=0, + ) + raw_response = response.choices[0].message.content + accumulate_token_usage(total_token_usage, response) + except Exception as e: + logger.error( + f"QA LLM call failed for node '{node_name}' on run {workflow_run_id}: {e}" + ) + node_results[node_id] = { + "node_name": node_name, + "error": str(e), + "tags": [], + "summary": "", + "score": None, + } + prior_conversation.extend(node_conversation) + continue + + # Trace + span_name = f"qa-node-{node_name}" + add_qa_span_to_trace(parent_ctx, model, messages, raw_response, span_name) + + # Parse response + node_result: dict[str, Any] = { + "node_name": node_name, + "raw_response": raw_response, + } + try: + parsed = parse_llm_json(raw_response) + node_result["tags"] = parsed.get("tags", []) + node_result["summary"] = parsed.get("summary", "") + node_result["score"] = parsed.get("call_quality_score") + node_result["overall_sentiment"] = parsed.get("overall_sentiment") + except (json.JSONDecodeError, ValueError): + node_result["tags"] = [] + node_result["summary"] = "" + node_result["score"] = None + + node_results[node_id] = node_result + + # Append this node's conversation to running total + prior_conversation.extend(node_conversation) + + result: dict[str, Any] = { + "node_results": node_results, + "model": model, + } + if total_token_usage: + result["token_usage"] = total_token_usage + return result + + +async def _run_whole_call_qa_analysis( + qa_node_data: dict[str, Any], + workflow_run: WorkflowRunModel, + workflow_run_id: int, +) -> dict[str, Any]: + """Run whole-call QA analysis (fallback when events lack node_id). + + Returns results wrapped in the per-node format for consistency. + """ + logs = workflow_run.logs or {} + rtf_events = logs.get("realtime_feedback_events", []) + if not rtf_events: + logger.warning(f"No realtime_feedback_events for run {workflow_run_id}") + return {"error": "no_transcript", "node_results": {}} + + conversation = build_conversation_structure(rtf_events) + transcript = format_transcript(conversation) + if not transcript: + logger.warning(f"Empty transcript for run {workflow_run_id}") + return {"error": "empty_transcript", "node_results": {}} + + # Compute call metrics + usage_info = workflow_run.usage_info or {} + call_duration = usage_info.get("call_duration_seconds") + metrics = compute_call_metrics(rtf_events, call_duration) + + # Resolve LLM config + system_prompt = qa_node_data.get("qa_system_prompt", "") + if not system_prompt: + logger.warning("No system prompt defined for QA Node") + return {"error": "no_system_prompt", "node_results": {}} + + model, api_key, base_url = await resolve_llm_config(qa_node_data, workflow_run) + + if not api_key: + logger.warning( + f"No LLM API key configured for QA analysis on run {workflow_run_id}" + ) + return {"error": "no_api_key", "node_results": {}} + + # Build messages — substitute all placeholders with sensible defaults + template_context = { + "node_summary": "", + "previous_conversation_summary": "", + "transcript": transcript, + "metrics": json.dumps(metrics, indent=2), + } + system_content = render_template(system_prompt, template_context) + messages = [ + {"role": "system", "content": system_content}, + {"role": "user", "content": f"## Transcript\n{transcript}"}, + ] + + # Call LLM + client_kwargs: dict[str, Any] = {"api_key": api_key} + if base_url: + client_kwargs["base_url"] = base_url + + client = AsyncOpenAI(**client_kwargs) + + try: + response = await client.chat.completions.create( + model=model, + messages=messages, + temperature=0, + ) + raw_response = response.choices[0].message.content + except Exception as e: + logger.error(f"QA LLM call failed for run {workflow_run_id}: {e}") + return {"error": str(e), "node_results": {}} + + # Extract token usage + token_usage: dict[str, int] = {} + accumulate_token_usage(token_usage, response) + + # Parse response + node_result: dict[str, Any] = { + "node_name": "whole_call", + "raw_response": raw_response, + } + try: + parsed = parse_llm_json(raw_response) + node_result["tags"] = parsed.get("tags", []) + node_result["summary"] = parsed.get("summary", "") + node_result["score"] = parsed.get("call_quality_score") + node_result["overall_sentiment"] = parsed.get("overall_sentiment") + except (json.JSONDecodeError, ValueError): + node_result["tags"] = [] + node_result["summary"] = "" + node_result["score"] = None + + # Langfuse tracing + parent_ctx = setup_langfuse_parent_context(workflow_run) + add_qa_span_to_trace(parent_ctx, model, messages, raw_response, "qa-analysis") + + result: dict[str, Any] = { + "node_results": {"whole_call": node_result}, + "model": model, + } + if token_usage: + result["token_usage"] = token_usage + return result diff --git a/api/services/workflow/qa/conversation.py b/api/services/workflow/qa/conversation.py new file mode 100644 index 0000000..67b1f35 --- /dev/null +++ b/api/services/workflow/qa/conversation.py @@ -0,0 +1,109 @@ +"""Conversation building, transcript formatting, and per-node event splitting.""" + +from collections import OrderedDict +from datetime import datetime + +from pipecat.utils.enums import RealtimeFeedbackType + + +def build_conversation_structure(logs: list[dict]) -> list[dict]: + """Transform raw call logs into a conversation structure for LLM QA analysis.""" + if not logs: + return [] + + start_time = datetime.fromisoformat(logs[0]["timestamp"]) + + conversation = [] + for event in logs: + if event["type"] == RealtimeFeedbackType.BOT_TEXT.value: + speaker = "assistant" + utterance_text = event["payload"]["text"] + try: + event_time = datetime.fromisoformat(event["payload"]["timestamp"]) + except KeyError: + event_time = datetime.fromisoformat(event["timestamp"]) + elif event["type"] == RealtimeFeedbackType.USER_TRANSCRIPTION.value and event[ + "payload" + ].get("final", False): + speaker = "user" + utterance_text = event["payload"]["text"] + try: + event_time = datetime.fromisoformat(event["payload"]["timestamp"]) + except KeyError: + event_time = datetime.fromisoformat(event["timestamp"]) + elif event["type"] == RealtimeFeedbackType.FUNCTION_CALL_START.value: + speaker = "tool_call" + payload = event["payload"] + utterance_text = payload.get("function_name", "unknown") + event_time = datetime.fromisoformat(event["timestamp"]) + else: + continue + + time_from_start = (event_time - start_time).total_seconds() + + conversation.append( + { + "time_from_start_seconds": round(time_from_start, 2), + "speaker": speaker, + "text": utterance_text, + "node_name": event.get("node_name", ""), + "turn": event.get("turn", 0), + } + ) + + return conversation + + +def format_transcript(conversation: list[dict]) -> str: + """Format conversation structure into a readable transcript string for the LLM.""" + lines = [] + for entry in conversation: + if entry["speaker"] == "tool_call": + lines.append( + f"[{entry['time_from_start_seconds']:.1f}s] " + f"[tool_call]: {entry['text']}" + ) + else: + lines.append( + f"[{entry['time_from_start_seconds']:.1f}s] " + f"{entry['speaker']}: {entry['text']}" + ) + return "\n".join(lines) + + +def split_events_by_node( + rtf_events: list[dict], +) -> list[tuple[str, str, list[dict]]]: + """Split realtime_feedback_events by node_id. + + Returns an ordered list of (node_id, node_name, events) tuples. + Only includes nodes that have conversational content (BOT_TEXT or USER_TRANSCRIPTION). + """ + conversational_types = { + RealtimeFeedbackType.BOT_TEXT.value, + RealtimeFeedbackType.USER_TRANSCRIPTION.value, + } + + # Preserve insertion order — first occurrence defines position + node_events: OrderedDict[str, list[dict]] = OrderedDict() + node_names: dict[str, str] = {} + + for event in rtf_events: + node_id = event.get("node_id") + if not node_id: + return [] # Events lack node_id — caller should fall back + + if node_id not in node_events: + node_events[node_id] = [] + node_names[node_id] = event.get("node_name", "") + + node_events[node_id].append(event) + + # Filter to nodes with conversational content + result = [] + for node_id, events in node_events.items(): + has_conversation = any(e["type"] in conversational_types for e in events) + if has_conversation: + result.append((node_id, node_names[node_id], events)) + + return result diff --git a/api/services/workflow/qa/llm_config.py b/api/services/workflow/qa/llm_config.py new file mode 100644 index 0000000..31cc0e0 --- /dev/null +++ b/api/services/workflow/qa/llm_config.py @@ -0,0 +1,98 @@ +"""LLM configuration resolution and token usage accumulation.""" + +from api.db import db_client +from api.db.models import WorkflowRunModel + + +def _provider_base_url(provider: str | None, endpoint: str = "") -> str | None: + """Return the base URL for a given LLM provider.""" + if provider == "openrouter": + return "https://openrouter.ai/api/v1" + if provider == "groq": + return "https://api.groq.com/openai/v1" + if provider == "google": + return "https://generativelanguage.googleapis.com/v1beta/openai/" + if provider == "azure": + return endpoint or None + return None + + +async def resolve_llm_config( + qa_node_data: dict, workflow_run: WorkflowRunModel +) -> tuple[str, str, str | None]: + """Resolve the LLM model, API key, and base URL for QA analysis. + + If the QA node has its own LLM configuration (qa_use_workflow_llm=False), + use those settings directly. Otherwise, fall back to the user's configured LLM. + + Returns: + (model, api_key, base_url) tuple + """ + if not qa_node_data.get("qa_use_workflow_llm", True): + return ( + qa_node_data.get("qa_model"), + qa_node_data.get("qa_api_key"), + _provider_base_url( + qa_node_data.get("qa_provider"), + qa_node_data.get("qa_endpoint", ""), + ), + ) + + # Fall back to user's configured LLM + model, api_key, base_url = await resolve_user_llm_config(workflow_run) + + qa_model = qa_node_data.get("qa_model", "default") + if qa_model and qa_model != "default": + model = qa_model + + return model, api_key, base_url + + +async def resolve_user_llm_config( + workflow_run: WorkflowRunModel, +) -> tuple[str, str, str | None]: + """Resolve the user's configured LLM (from UserConfiguration). + + Returns: + (model, api_key, base_url) tuple + """ + user_id = None + if workflow_run.workflow and workflow_run.workflow.user: + user_id = workflow_run.workflow.user.id + + llm_config: dict = {} + if user_id: + user_configuration = await db_client.get_user_configurations(user_id) + llm_config = user_configuration.model_dump(exclude_none=True).get("llm", {}) + + provider = llm_config.get("provider", "openai") + api_key = llm_config.get("api_key", "") + model = llm_config.get("model", "gpt-4.1") + base_url = _provider_base_url(provider, llm_config.get("endpoint", "")) + if provider == "openrouter" and llm_config.get("base_url"): + base_url = llm_config["base_url"] + + return model, api_key, base_url + + +def accumulate_token_usage(total: dict, response) -> None: + """Add token counts from an LLM response to the running total dict.""" + if not response.usage: + return + total["prompt_tokens"] = total.get("prompt_tokens", 0) + ( + response.usage.prompt_tokens or 0 + ) + total["completion_tokens"] = total.get("completion_tokens", 0) + ( + response.usage.completion_tokens or 0 + ) + total["total_tokens"] = total.get("total_tokens", 0) + ( + response.usage.total_tokens or 0 + ) + total["cache_read_input_tokens"] = total.get("cache_read_input_tokens", 0) + ( + getattr(response.usage, "cache_read_input_tokens", 0) or 0 + ) + cache_creation = getattr(response.usage, "cache_creation_input_tokens", None) + if cache_creation is not None: + total["cache_creation_input_tokens"] = ( + total.get("cache_creation_input_tokens") or 0 + ) + cache_creation diff --git a/api/services/workflow/qa/metrics.py b/api/services/workflow/qa/metrics.py new file mode 100644 index 0000000..a432e7c --- /dev/null +++ b/api/services/workflow/qa/metrics.py @@ -0,0 +1,37 @@ +"""Call metrics computation from raw event logs.""" + +from pipecat.utils.enums import RealtimeFeedbackType + + +def compute_call_metrics( + logs: list[dict], call_duration_seconds: float | None = None +) -> dict: + """Pre-compute quantitative metrics from raw call logs.""" + latencies = [] + ttfb_values = [] + + for event in logs: + if event["type"] == RealtimeFeedbackType.LATENCY_MEASURED.value: + latencies.append(event["payload"]["latency_seconds"]) + elif event["type"] == RealtimeFeedbackType.TTFB_METRIC.value: + ttfb_values.append(event["payload"]["ttfb_seconds"]) + + turns = set() + for event in logs: + if event["type"] in ( + RealtimeFeedbackType.USER_TRANSCRIPTION.value, + RealtimeFeedbackType.BOT_TEXT.value, + ): + turns.add(event.get("turn", 0)) + + return { + "call_duration_seconds": call_duration_seconds, + "num_turns": len(turns), + "avg_latency_seconds": ( + round(sum(latencies) / len(latencies), 2) if latencies else None + ), + "avg_ttfb_seconds": ( + round(sum(ttfb_values) / len(ttfb_values), 2) if ttfb_values else None + ), + "max_latency_seconds": round(max(latencies), 2) if latencies else None, + } diff --git a/api/services/workflow/qa/node_summary.py b/api/services/workflow/qa/node_summary.py new file mode 100644 index 0000000..0768234 --- /dev/null +++ b/api/services/workflow/qa/node_summary.py @@ -0,0 +1,184 @@ +"""Node summary generation and caching for per-node QA analysis.""" + +from typing import Any + +from loguru import logger +from openai import AsyncOpenAI + +from api.db import db_client +from api.db.models import WorkflowRunModel +from api.services.workflow.dto import NodeType +from api.services.workflow.qa.llm_config import resolve_llm_config +from api.services.workflow.qa.tracing import create_node_summary_trace + +NODE_SUMMARY_SYSTEM_PROMPT = ( + "You are analyzing a voice AI agent script. This is only a part of a larger script. " + "Produce a concise summary (2-4 sentences) describing this script purpose, " + "what the agent should accomplish, and key behaviors. We will be using this " + "summary to do a QA on the conversation that the agent would do with someone " + "so try to capture the nuances of the script as much as possible." +) + +CONVERSATION_SUMMARY_SYSTEM_PROMPT = ( + "You are summarizing a portion of a voice AI conversation. " + "Produce a concise summary (3-5 sentences) covering key topics, " + "information exchanged, and current state. We would be using this " + "summary in doing a QA of the conversation that the voice AI agent " + "did with someone so try to capture the nuances of the conversation " + "as much as possible." +) + + +def get_node_summary_text(node_summaries: dict, node_id: str) -> str: + """Extract the summary text from a node_summaries entry. + + Handles both the current format (dict with "summary" key) and the + legacy format (plain string) for backward compatibility. + """ + entry = node_summaries.get(node_id) + if entry is None: + return "" + if isinstance(entry, str): + return entry + return entry.get("summary", "") + + +async def ensure_node_summaries( + workflow_definition: dict, + definition_id: int | None, + workflow_run: WorkflowRunModel, + qa_node_data: dict, +) -> dict[str, Any]: + """Ensure every agentNode/startCall node has a summary in the definition. + + Returns the node_summaries dict: + {node_id: {"summary": "...", "trace_url": "..."}, ...} + """ + existing_summaries: dict[str, Any] = workflow_definition.get("node_summaries", {}) + + nodes = workflow_definition.get("nodes", []) + summarizable_types = {NodeType.agentNode.value, NodeType.startNode.value} + nodes_needing_summary = [ + n + for n in nodes + if n.get("type") in summarizable_types and n.get("id") not in existing_summaries + ] + + if not nodes_needing_summary: + return existing_summaries + + model, api_key, base_url = await resolve_llm_config(qa_node_data, workflow_run) + if not api_key: + logger.warning("No API key for node summary generation, skipping") + return existing_summaries + + client_kwargs: dict[str, Any] = {"api_key": api_key} + if base_url: + client_kwargs["base_url"] = base_url + client = AsyncOpenAI(**client_kwargs) + + updated_summaries = dict(existing_summaries) + + # Collect all tool UUIDs across nodes and fetch them in one query + all_tool_uuids: set[str] = set() + for node in nodes_needing_summary: + node_data = node.get("data", {}) + for uuid in node_data.get("tool_uuids", []): + all_tool_uuids.add(uuid) + + tool_map: dict[str, Any] = {} + if all_tool_uuids: + organization_id = ( + workflow_run.workflow.organization_id if workflow_run.workflow else None + ) + if organization_id: + try: + tools = await db_client.get_tools_by_uuids( + list(all_tool_uuids), organization_id + ) + for t in tools: + tool_map[t.tool_uuid] = { + "name": t.name, + "description": t.description or "", + } + except Exception as e: + logger.warning(f"Failed to fetch tools for node summaries: {e}") + + # Build a map of outgoing edges per node (edges are also tool calls) + edges = workflow_definition.get("edges", []) + outgoing_edges_by_node: dict[str, list[dict]] = {} + for edge in edges: + source = edge.get("source") + if source: + outgoing_edges_by_node.setdefault(source, []).append(edge) + + for node in nodes_needing_summary: + node_id = node["id"] + node_data = node.get("data", {}) + node_name = node_data.get("name", "Unnamed") + + # Build a description of the node for the LLM + node_info_parts = [f"Node name: {node_name}"] + if node_data.get("prompt"): + node_info_parts.append(f"Agent prompt:\n{node_data['prompt']}") + + # Collect all available tools: custom tools + outgoing edges + tool_descriptions = [] + + node_tool_uuids = node_data.get("tool_uuids", []) + for uuid in node_tool_uuids: + tool_info = tool_map.get(uuid) + if tool_info: + desc = f"- {tool_info['name']}" + if tool_info["description"]: + desc += f": {tool_info['description']}" + tool_descriptions.append(desc) + + for edge in outgoing_edges_by_node.get(node_id, []): + edge_data = edge.get("data", {}) + label = edge_data.get("label", "") + condition = edge_data.get("condition", "") + if label: + desc = f"- {label}" + if condition: + desc += f": {condition}" + tool_descriptions.append(desc) + + if tool_descriptions: + node_info_parts.append("Available tools:\n" + "\n".join(tool_descriptions)) + node_info = "\n".join(node_info_parts) + messages = [ + {"role": "system", "content": NODE_SUMMARY_SYSTEM_PROMPT}, + {"role": "user", "content": node_info}, + ] + + try: + response = await client.chat.completions.create( + model=model, + messages=messages, + temperature=0, + ) + summary_text = response.choices[0].message.content or "" + except Exception as e: + logger.warning(f"Failed to generate summary for node {node_id}: {e}") + updated_summaries[node_id] = {"summary": ""} + continue + + # Create a Langfuse trace for this summary generation + trace_url = create_node_summary_trace(model, messages, summary_text, node_name) + + entry: dict[str, Any] = {"summary": summary_text} + if trace_url: + entry["trace_url"] = trace_url + updated_summaries[node_id] = entry + + # Persist to DB + if definition_id and updated_summaries != existing_summaries: + try: + await db_client.update_definition_node_summaries( + definition_id, updated_summaries + ) + except Exception as e: + logger.warning(f"Failed to persist node summaries: {e}") + + return updated_summaries diff --git a/api/services/workflow/qa/tracing.py b/api/services/workflow/qa/tracing.py new file mode 100644 index 0000000..ab2bc33 --- /dev/null +++ b/api/services/workflow/qa/tracing.py @@ -0,0 +1,154 @@ +"""Langfuse / OpenTelemetry tracing helpers for QA analysis.""" + +import re + +from loguru import logger + +from api.db.models import WorkflowRunModel + + +def extract_trace_id(gathered_context: dict) -> str | None: + """Extract Langfuse trace_id from gathered_context trace_url. + + URL format: https://langfuse.dograh.com/project//traces/ + """ + trace_url = gathered_context.get("trace_url") + if not trace_url: + return None + try: + match = re.search(r"/traces/([a-fA-F0-9]+)$", trace_url) + if match: + return match.group(1) + except Exception: + pass + return None + + +def setup_langfuse_parent_context(workflow_run: WorkflowRunModel): + """Set up OTEL parent context from the workflow run's Langfuse trace. + + Returns the parent context object, or None if tracing is unavailable. + """ + try: + from opentelemetry.trace import ( + NonRecordingSpan, + SpanContext, + TraceFlags, + set_span_in_context, + ) + + from api.services.pipecat.tracing_config import ( + is_tracing_enabled, + setup_tracing_exporter, + ) + + if not is_tracing_enabled(): + return None + + setup_tracing_exporter() + + gathered_context = workflow_run.gathered_context or {} + trace_id = extract_trace_id(gathered_context) + if not trace_id: + logger.debug("No trace_id found, skipping Langfuse tracing") + return None + + parent_span_ctx = SpanContext( + trace_id=int(trace_id, 16), + span_id=0x1, + is_remote=True, + trace_flags=TraceFlags(0x01), + ) + return set_span_in_context(NonRecordingSpan(parent_span_ctx)) + + except Exception as e: + logger.warning(f"Failed to set up Langfuse parent context: {e}") + return None + + +def add_qa_span_to_trace( + parent_ctx, + model: str, + messages: list[dict], + output: str, + span_name: str, +) -> None: + """Create a child span under the conversation trace.""" + if parent_ctx is None: + return + try: + from opentelemetry import trace as otel_trace + + from pipecat.utils.tracing.service_attributes import add_llm_span_attributes + + tracer = otel_trace.get_tracer("pipecat") + with tracer.start_as_current_span( + span_name, + context=parent_ctx, + ) as span: + add_llm_span_attributes( + span, + service_name="OpenAILLMService", + model=model, + operation_name=span_name, + messages=messages, + output=output, + stream=False, + parameters={"temperature": 0}, + ) + except Exception as e: + logger.warning(f"Failed to trace span '{span_name}' to Langfuse: {e}") + + +def create_node_summary_trace( + model: str, + messages: list[dict], + output: str, + node_name: str, +) -> str | None: + """Create a standalone Langfuse trace for a node summary generation. + + Returns the trace URL, or None if tracing is unavailable. + """ + try: + from opentelemetry import trace as otel_trace + from opentelemetry.context import Context + + from api.services.pipecat.tracing_config import ( + is_tracing_enabled, + setup_tracing_exporter, + ) + from pipecat.utils.tracing.service_attributes import add_llm_span_attributes + + if not is_tracing_enabled(): + return None + + setup_tracing_exporter() + + tracer = otel_trace.get_tracer("pipecat") + + # Create a root span (new trace) for this node summary generation + with tracer.start_as_current_span( + f"node-summary-{node_name}", + context=Context(), + ) as span: + add_llm_span_attributes( + span, + service_name="OpenAILLMService", + model=model, + operation_name=f"node-summary-{node_name}", + messages=messages, + output=output, + stream=False, + parameters={"temperature": 0}, + ) + trace_id = format(span.get_span_context().trace_id, "032x") + + from langfuse import get_client + + langfuse = get_client() + return langfuse.get_trace_url(trace_id=trace_id) + + except Exception as e: + logger.warning(f"Failed to create node summary trace for '{node_name}': {e}") + return None diff --git a/api/tasks/run_integrations.py b/api/tasks/run_integrations.py index 4af3098..b090d19 100644 --- a/api/tasks/run_integrations.py +++ b/api/tasks/run_integrations.py @@ -9,7 +9,7 @@ from loguru import logger from api.constants import BACKEND_API_ENDPOINT from api.db import db_client from api.db.models import WorkflowRunModel -from api.services.qa_analysis import run_qa_analysis +from api.services.workflow.qa import run_per_node_qa_analysis from api.utils.credential_auth import build_auth_header from api.utils.template_renderer import render_template from pipecat.utils.enums import EndTaskReason @@ -53,6 +53,8 @@ async def _run_qa_nodes( qa_nodes: list[dict], workflow_run: WorkflowRunModel, workflow_run_id: int, + workflow_definition: dict, + definition_id: int | None, ) -> Dict[str, Any]: """Run QA analysis for each enabled QA node and aggregate results. @@ -78,16 +80,33 @@ async def _run_qa_nodes( try: logger.info(f"Running QA analysis for node '{node_name}' (#{node_id})") - result = await run_qa_analysis(node_data, workflow_run, workflow_run_id) + result = await run_per_node_qa_analysis( + node_data, + workflow_run, + workflow_run_id, + workflow_definition, + definition_id, + ) results[f"qa_{node_id}"] = result + # Log summary from node_results + node_results = result.get("node_results", {}) logger.info( f"QA analysis complete for '{node_name}': " - f"score={result.get('score')}, tags={len(result.get('tags', []))}" + f"{len(node_results)} nodes analyzed" ) except Exception as e: logger.error(f"QA analysis failed for node '{node_name}': {e}") results[f"qa_{node_id}"] = {"error": str(e)} + # Collect all unique tags across all QA node results for top-level filtering + all_tags: set[str] = set() + for result in results.values(): + for node_result in result.get("node_results", {}).values(): + if isinstance(node_result, dict): + all_tags.update(node_result.get("tags", [])) + if all_tags: + results["tags"] = sorted(all_tags) + return results @@ -159,8 +178,16 @@ async def run_integrations_post_workflow_run(_ctx, workflow_run_id: int): logger.warning("No organization found, skipping integrations") return - # Step 2: Get workflow definition - workflow_definition = workflow_run.workflow.workflow_definition_with_fallback + # Step 2: Get workflow definition (prefer the run-specific definition) + if workflow_run.definition: + workflow_definition = workflow_run.definition.workflow_json + definition_id = workflow_run.definition.id + else: + workflow_definition = ( + workflow_run.workflow.workflow_definition_with_fallback + ) + definition_id = workflow_run.workflow.current_definition_id + if not workflow_definition: logger.debug("No workflow definition, skipping integrations") return @@ -183,7 +210,13 @@ async def run_integrations_post_workflow_run(_ctx, workflow_run_id: int): # Step 5: Run QA analysis before webhooks if qa_nodes: logger.info(f"Found {len(qa_nodes)} QA nodes to execute") - qa_results = await _run_qa_nodes(qa_nodes, workflow_run, workflow_run_id) + qa_results = await _run_qa_nodes( + qa_nodes, + workflow_run, + workflow_run_id, + workflow_definition, + definition_id, + ) if qa_results: await db_client.update_workflow_run( diff --git a/ui/src/app/workflow/[workflowId]/hooks/useWorkflowState.ts b/ui/src/app/workflow/[workflowId]/hooks/useWorkflowState.ts index b6f49f0..81df8c4 100644 --- a/ui/src/app/workflow/[workflowId]/hooks/useWorkflowState.ts +++ b/ui/src/app/workflow/[workflowId]/hooks/useWorkflowState.ts @@ -22,7 +22,13 @@ import logger from '@/lib/logger'; import { getNextNodeId, getRandomId } from "@/lib/utils"; import { DEFAULT_WORKFLOW_CONFIGURATIONS, WorkflowConfigurations } from "@/types/workflow-configurations"; -const DEFAULT_QA_SYSTEM_PROMPT = `You are a QA expert analyzing voice AI call transcripts. Analyze the conversation and return a structured JSON assessment. +const DEFAULT_QA_SYSTEM_PROMPT = `You are a QA analyst evaluating a specific segment of a voice AI conversation. + +## Node Purpose +{{node_summary}} + +## Previous Conversation Context (For start of conversation, previous conversation summary can be empty.) +{{previous_conversation_summary}} ## Tags to evaluate @@ -42,7 +48,7 @@ Examine the conversation carefully and identify which of the following tags appl ## Call metrics (pre-computed) Use these alongside the transcript for your analysis: -{metrics} +{{metrics}} ## Output format @@ -56,7 +62,7 @@ Return ONLY a valid JSON object (no markdown): ], "overall_sentiment": "positive|neutral|negative", "call_quality_score": <1-10>, - "summary": "1-2 sentence summary of the call" + "summary": "1-2 sentence summary of this segment" } If no tags apply, return an empty tags list. Always provide sentiment, score, and summary.`; diff --git a/ui/src/components/flow/nodes/QANode.tsx b/ui/src/components/flow/nodes/QANode.tsx index d08caf4..8af7934 100644 --- a/ui/src/components/flow/nodes/QANode.tsx +++ b/ui/src/components/flow/nodes/QANode.tsx @@ -268,14 +268,16 @@ const QANodeEditForm = ({