"""Main QA analysis orchestrator — per-node and whole-call fallback.""" import json from typing import Any from loguru import logger from openai import AsyncOpenAI from api.db.models import WorkflowRunModel from api.services.gen_ai.json_parser import parse_llm_json from api.services.workflow.qa.conversation import ( build_conversation_structure, format_transcript, split_events_by_node, ) from api.services.workflow.qa.llm_config import ( accumulate_token_usage, resolve_llm_config, ) from api.services.workflow.qa.metrics import compute_call_metrics from api.services.workflow.qa.node_summary import ( CONVERSATION_SUMMARY_SYSTEM_PROMPT, ensure_node_summaries, get_node_summary_text, ) from api.services.workflow.qa.tracing import ( add_qa_span_to_trace, setup_langfuse_parent_context, ) from api.utils.template_renderer import render_template async def _generate_conversation_summary( client: AsyncOpenAI, model: str, transcript: str, parent_ctx, node_name: str, total_token_usage: dict, ) -> str: """Generate a summary of the conversation so far (before the current node). Traced to Langfuse as conversation-summary-before-{node_name}. """ messages = [ {"role": "system", "content": CONVERSATION_SUMMARY_SYSTEM_PROMPT}, {"role": "user", "content": f"## Conversation\n{transcript}"}, ] try: response = await client.chat.completions.create( model=model, messages=messages, temperature=0, ) summary = response.choices[0].message.content or "" accumulate_token_usage(total_token_usage, response) span_name = f"conversation-summary-before-{node_name}" add_qa_span_to_trace(parent_ctx, model, messages, summary, span_name) return summary except Exception as e: logger.warning( f"Failed to generate conversation summary before {node_name}: {e}" ) return "" async def run_per_node_qa_analysis( qa_node_data: dict[str, Any], workflow_run: WorkflowRunModel, workflow_run_id: int, workflow_definition: dict, definition_id: int | None, ) -> dict[str, Any]: """Run per-node QA analysis on a completed workflow run. Splits the call by node, generates per-node summaries and conversation context, then evaluates each node segment individually. Falls back to whole-call QA if events lack node_id. Returns: Dict with node_results, token_usage, model """ logs = workflow_run.logs or {} rtf_events = logs.get("realtime_feedback_events", []) if not rtf_events: logger.warning(f"No realtime_feedback_events for run {workflow_run_id}") return {"error": "no_transcript", "node_results": {}} # Try to split by node node_splits = split_events_by_node(rtf_events) if not node_splits: # Fall back to whole-call QA logger.info( f"Events lack node_id for run {workflow_run_id}, falling back to whole-call QA" ) return await _run_whole_call_qa_analysis( qa_node_data, workflow_run, workflow_run_id ) system_prompt = qa_node_data.get("qa_system_prompt", "") if not system_prompt: logger.warning("No system prompt defined for QA Node") return {"error": "no_system_prompt", "node_results": {}} # Resolve LLM config model, api_key, base_url = await resolve_llm_config(qa_node_data, workflow_run) if not api_key: logger.warning( f"No LLM API key configured for QA analysis on run {workflow_run_id}" ) return {"error": "no_api_key", "node_results": {}} # Ensure node summaries node_summaries = await ensure_node_summaries( workflow_definition, definition_id, workflow_run, qa_node_data ) # Set up Langfuse tracing parent_ctx = setup_langfuse_parent_context(workflow_run) # Build LLM client client_kwargs: dict[str, Any] = {"api_key": api_key} if base_url: client_kwargs["base_url"] = base_url client = AsyncOpenAI(**client_kwargs) total_token_usage: dict[str, int] = {} node_results: dict[str, Any] = {} prior_conversation: list[dict] = [] # Running accumulation of all prior nodes for idx, (node_id, node_name, node_events) in enumerate(node_splits): # Build this node's conversation and transcript node_conversation = build_conversation_structure(node_events) node_transcript = format_transcript(node_conversation) if not node_transcript: continue # Compute per-node metrics node_metrics = compute_call_metrics(node_events) # Get node summary node_summary = get_node_summary_text(node_summaries, node_id) # Generate conversation summary from prior nodes (if not first) previous_conversation_summary = "" if idx > 0 and prior_conversation: prior_transcript = format_transcript(prior_conversation) previous_conversation_summary = await _generate_conversation_summary( client, model, prior_transcript, parent_ctx, node_name, total_token_usage, ) # Substitute placeholders in the user's system prompt template_context = { "node_summary": node_summary, "previous_conversation_summary": previous_conversation_summary, "transcript": node_transcript, "metrics": json.dumps(node_metrics, indent=2), } system_content = render_template(system_prompt, template_context) messages = [ {"role": "system", "content": system_content}, {"role": "user", "content": f"## Transcript\n{node_transcript}"}, ] # Call QA LLM try: response = await client.chat.completions.create( model=model, messages=messages, temperature=0, ) raw_response = response.choices[0].message.content accumulate_token_usage(total_token_usage, response) except Exception as e: logger.error( f"QA LLM call failed for node '{node_name}' on run {workflow_run_id}: {e}" ) node_results[node_id] = { "node_name": node_name, "error": str(e), "tags": [], "summary": "", "score": None, } prior_conversation.extend(node_conversation) continue # Trace span_name = f"qa-node-{node_name}" add_qa_span_to_trace(parent_ctx, model, messages, raw_response, span_name) # Parse response node_result: dict[str, Any] = { "node_name": node_name, "raw_response": raw_response, } try: parsed = parse_llm_json(raw_response) node_result["tags"] = parsed.get("tags", []) node_result["summary"] = parsed.get("summary", "") node_result["score"] = parsed.get("call_quality_score") node_result["overall_sentiment"] = parsed.get("overall_sentiment") except (json.JSONDecodeError, ValueError): node_result["tags"] = [] node_result["summary"] = "" node_result["score"] = None node_results[node_id] = node_result # Append this node's conversation to running total prior_conversation.extend(node_conversation) result: dict[str, Any] = { "node_results": node_results, "model": model, } if total_token_usage: result["token_usage"] = total_token_usage return result async def _run_whole_call_qa_analysis( qa_node_data: dict[str, Any], workflow_run: WorkflowRunModel, workflow_run_id: int, ) -> dict[str, Any]: """Run whole-call QA analysis (fallback when events lack node_id). Returns results wrapped in the per-node format for consistency. """ logs = workflow_run.logs or {} rtf_events = logs.get("realtime_feedback_events", []) if not rtf_events: logger.warning(f"No realtime_feedback_events for run {workflow_run_id}") return {"error": "no_transcript", "node_results": {}} conversation = build_conversation_structure(rtf_events) transcript = format_transcript(conversation) if not transcript: logger.warning(f"Empty transcript for run {workflow_run_id}") return {"error": "empty_transcript", "node_results": {}} # Compute call metrics usage_info = workflow_run.usage_info or {} call_duration = usage_info.get("call_duration_seconds") metrics = compute_call_metrics(rtf_events, call_duration) # Resolve LLM config system_prompt = qa_node_data.get("qa_system_prompt", "") if not system_prompt: logger.warning("No system prompt defined for QA Node") return {"error": "no_system_prompt", "node_results": {}} model, api_key, base_url = await resolve_llm_config(qa_node_data, workflow_run) if not api_key: logger.warning( f"No LLM API key configured for QA analysis on run {workflow_run_id}" ) return {"error": "no_api_key", "node_results": {}} # Build messages — substitute all placeholders with sensible defaults template_context = { "node_summary": "", "previous_conversation_summary": "", "transcript": transcript, "metrics": json.dumps(metrics, indent=2), } system_content = render_template(system_prompt, template_context) messages = [ {"role": "system", "content": system_content}, {"role": "user", "content": f"## Transcript\n{transcript}"}, ] # Call LLM client_kwargs: dict[str, Any] = {"api_key": api_key} if base_url: client_kwargs["base_url"] = base_url client = AsyncOpenAI(**client_kwargs) try: response = await client.chat.completions.create( model=model, messages=messages, temperature=0, ) raw_response = response.choices[0].message.content except Exception as e: logger.error(f"QA LLM call failed for run {workflow_run_id}: {e}") return {"error": str(e), "node_results": {}} # Extract token usage token_usage: dict[str, int] = {} accumulate_token_usage(token_usage, response) # Parse response node_result: dict[str, Any] = { "node_name": "whole_call", "raw_response": raw_response, } try: parsed = parse_llm_json(raw_response) node_result["tags"] = parsed.get("tags", []) node_result["summary"] = parsed.get("summary", "") node_result["score"] = parsed.get("call_quality_score") node_result["overall_sentiment"] = parsed.get("overall_sentiment") except (json.JSONDecodeError, ValueError): node_result["tags"] = [] node_result["summary"] = "" node_result["score"] = None # Langfuse tracing parent_ctx = setup_langfuse_parent_context(workflow_run) add_qa_span_to_trace(parent_ctx, model, messages, raw_response, "qa-analysis") result: dict[str, Any] = { "node_results": {"whole_call": node_result}, "model": model, } if token_usage: result["token_usage"] = token_usage return result