dograh/api/tasks/run_integrations.py
Abhishek d4b6afb020
feat: add logs in campaigns for failure or pausing (#265)
* feat: add logs in campaigns on failure

* chore: bump pipecat

* chore: update format.sh

* chore: fix github workflow

* fix: fix formatting errors
2026-05-05 19:23:50 +05:30

430 lines
15 KiB
Python

"""Execute integrations (QA analysis, webhooks) after workflow run completion."""
import random
from typing import Any, Dict, Optional
import httpx
from loguru import logger
from pydantic import ValidationError
from api.constants import BACKEND_API_ENDPOINT
from api.db import db_client
from api.db.models import WorkflowRunModel
from api.enums import OrganizationConfigurationKey
from api.services.pipecat.tracing_config import register_org_langfuse_credentials
from api.services.workflow.dto import (
QANodeData,
QARFNode,
WebhookNodeData,
WebhookRFNode,
)
from api.services.workflow.qa import run_per_node_qa_analysis
from api.utils.credential_auth import build_auth_header
from api.utils.template_renderer import render_template
from pipecat.utils.enums import EndTaskReason
from pipecat.utils.run_context import set_current_org_id, set_current_run_id
def _should_skip_qa(
qa_data: QANodeData,
workflow_run: WorkflowRunModel,
) -> str | None:
"""Check whether QA analysis should be skipped for this call.
Returns a reason string if the call should be skipped, or None if it should proceed.
"""
usage_info = workflow_run.usage_info or {}
call_duration = usage_info.get("call_duration_seconds")
if call_duration is not None and call_duration < qa_data.qa_min_call_duration:
return (
f"call duration ({call_duration:.1f}s) below minimum "
f"({qa_data.qa_min_call_duration}s)"
)
if not qa_data.qa_voicemail_calls:
gathered_context = workflow_run.gathered_context or {}
call_disposition = gathered_context.get("call_disposition", "")
if call_disposition == EndTaskReason.VOICEMAIL_DETECTED.value:
return "voicemail call and QA voicemail calls is disabled"
if qa_data.qa_sample_rate < 100:
roll = random.randint(1, 100)
if roll > qa_data.qa_sample_rate:
return (
f"excluded by sampling ({qa_data.qa_sample_rate}% sample rate, "
f"rolled {roll})"
)
return None
async def _run_qa_nodes(
qa_nodes: list[dict],
workflow_run: WorkflowRunModel,
workflow_run_id: int,
workflow_definition: dict,
definition_id: int | None,
) -> Dict[str, Any]:
"""Run QA analysis for each enabled QA node and aggregate results.
Returns:
Dict keyed by node ID with QA analysis results.
"""
results: Dict[str, Any] = {}
for node in qa_nodes:
node_id = node.get("id", "unknown")
try:
qa_node = QARFNode.model_validate(node)
except ValidationError as e:
logger.warning(f"QA node #{node_id} failed validation, skipping: {e}")
results[f"qa_{node_id}"] = {"error": "validation_failed"}
continue
qa_data = qa_node.data
node_name = qa_data.name
if not qa_data.qa_enabled:
logger.debug(f"QA node '{node_name}' is disabled, skipping")
continue
skip_reason = _should_skip_qa(qa_data, workflow_run)
if skip_reason:
logger.info(f"Skipping QA node '{node_name}' (#{node_id}): {skip_reason}")
results[f"qa_{node_id}"] = {"skipped": True, "reason": skip_reason}
continue
try:
logger.info(f"Running QA analysis for node '{node_name}' (#{node_id})")
result = await run_per_node_qa_analysis(
qa_data,
workflow_run,
workflow_run_id,
workflow_definition,
definition_id,
)
results[f"qa_{node_id}"] = result
# Log summary from node_results
node_results = result.get("node_results", {})
logger.info(
f"QA analysis complete for '{node_name}': "
f"{len(node_results)} nodes analyzed"
)
except Exception as e:
logger.error(f"QA analysis failed for node '{node_name}': {e}")
results[f"qa_{node_id}"] = {"error": str(e)}
return results
async def _update_usage_info_with_qa_tokens(
workflow_run_id: int,
workflow_run: WorkflowRunModel,
qa_results: Dict[str, Any],
) -> None:
"""Add QA analysis LLM token usage to the workflow run's usage_info."""
try:
usage_info = dict(workflow_run.usage_info or {})
llm_usage = dict(usage_info.get("llm", {}))
for _node_key, result in qa_results.items():
token_usage = result.get("token_usage")
model = result.get("model")
if not token_usage or not model:
continue
key = f"QAAnalysis|||{model}"
if key in llm_usage:
# Aggregate if multiple QA nodes use the same model
existing = llm_usage[key]
for field in (
"prompt_tokens",
"completion_tokens",
"total_tokens",
"cache_read_input_tokens",
):
existing[field] = (existing.get(field) or 0) + (
token_usage.get(field) or 0
)
else:
llm_usage[key] = token_usage
usage_info["llm"] = llm_usage
await db_client.update_workflow_run(
run_id=workflow_run_id, usage_info=usage_info
)
logger.info(f"Updated usage_info with QA token usage for run {workflow_run_id}")
except Exception as e:
logger.error(f"Failed to update usage_info with QA tokens: {e}")
async def run_integrations_post_workflow_run(_ctx, workflow_run_id: int):
"""
Run integrations after a workflow run completes.
This function:
1. Gets the workflow run and its contexts
2. Runs QA analysis nodes (if any)
3. Stores QA results in annotations
4. Executes webhook nodes with QA results available in render context
"""
set_current_run_id(workflow_run_id)
logger.info("Running integrations for workflow run")
try:
# Step 1: Get workflow run with full context
workflow_run, organization_id = await db_client.get_workflow_run_with_context(
workflow_run_id
)
if not workflow_run or not workflow_run.workflow:
logger.warning("Workflow run or workflow not found")
return
if not organization_id:
logger.warning("No organization found, skipping integrations")
return
# Set org context for tracing and register org-specific Langfuse credentials
# FIXME: If an org removes langfuse credentials during an exisitng deployment
# we should unregister an existing langfuse credentials for that org.
set_current_org_id(organization_id)
langfuse_config = await db_client.get_configuration_value(
organization_id,
OrganizationConfigurationKey.LANGFUSE_CREDENTIALS.value,
)
if langfuse_config:
register_org_langfuse_credentials(
org_id=organization_id,
host=langfuse_config.get("host"),
public_key=langfuse_config.get("public_key"),
secret_key=langfuse_config.get("secret_key"),
)
# Step 2: Get workflow definition from the run's pinned version
workflow_definition = workflow_run.definition.workflow_json
definition_id = workflow_run.definition.id
if not workflow_definition:
logger.debug("No workflow definition, skipping integrations")
return
# Step 3: Extract integration nodes
nodes = workflow_definition.get("nodes", [])
qa_nodes = [n for n in nodes if n.get("type") == "qa"]
webhook_nodes = [n for n in nodes if n.get("type") == "webhook"]
# Step 4: Generate public access token if webhooks exist or campaign_id is set
has_campaign = workflow_run.campaign_id is not None
if not webhook_nodes and not qa_nodes and not has_campaign:
logger.debug("No integration nodes and no campaign, skipping")
return
public_token = None
if webhook_nodes or has_campaign:
public_token = await db_client.ensure_public_access_token(workflow_run_id)
# Step 5: Run QA analysis before webhooks
if qa_nodes:
logger.info(f"Found {len(qa_nodes)} QA nodes to execute")
qa_results = await _run_qa_nodes(
qa_nodes,
workflow_run,
workflow_run_id,
workflow_definition,
definition_id,
)
if qa_results:
# Add QA token usage to workflow run's usage_info
await _update_usage_info_with_qa_tokens(
workflow_run_id, workflow_run, qa_results
)
# Collect unique tags across all QA node results for top-level filtering
all_tags: set[str] = set()
for qa_key, qa_result in qa_results.items():
for node_result in qa_result.get("node_results", {}).values():
for tag in node_result.get("tags", []):
if isinstance(tag, str):
all_tags.add(tag)
elif isinstance(tag, dict) and "tag" in tag:
all_tags.add(tag["tag"])
if all_tags:
qa_results["tags"] = sorted(all_tags)
await db_client.update_workflow_run(
workflow_run_id, annotations=qa_results
)
# Re-fetch workflow_run to get updated annotations
workflow_run, _ = await db_client.get_workflow_run_with_context(
workflow_run_id
)
# Step 6: Execute webhooks
if not webhook_nodes:
logger.debug("No webhook nodes in workflow")
return
logger.info(f"Found {len(webhook_nodes)} webhook nodes to execute")
# Step 7: Build render context (includes annotations from QA)
render_context = _build_render_context(workflow_run, public_token)
# Step 8: Execute each webhook node
for node in webhook_nodes:
node_id = node.get("id", "unknown")
try:
webhook_node = WebhookRFNode.model_validate(node)
except ValidationError as e:
logger.warning(
f"Webhook node #{node_id} failed validation, skipping: {e}"
)
continue
webhook_data = webhook_node.data
try:
await _execute_webhook_node(
webhook_data=webhook_data,
render_context=render_context,
organization_id=organization_id,
)
except Exception as e:
logger.warning(f"Failed to execute webhook '{webhook_data.name}': {e}")
except Exception as e:
logger.error(f"Error running integrations: {e}", exc_info=True)
raise
def _build_render_context(
workflow_run: WorkflowRunModel, public_token: Optional[str] = None
) -> Dict[str, Any]:
"""Build the context dict for template rendering.
Args:
workflow_run: The workflow run model
public_token: Optional public access token for download URLs
Returns:
Dict containing all fields available for template rendering
"""
context = {
# Top-level fields
"workflow_run_id": workflow_run.id,
"workflow_run_name": workflow_run.name,
"workflow_id": workflow_run.workflow_id,
"workflow_name": workflow_run.workflow.name if workflow_run.workflow else None,
# Nested contexts
"initial_context": workflow_run.initial_context or {},
"gathered_context": workflow_run.gathered_context or {},
"cost_info": workflow_run.usage_info or {},
# Annotations (includes QA results)
"annotations": workflow_run.annotations or {},
}
# Add public download URLs if token is available
if public_token:
base_url = (
f"{BACKEND_API_ENDPOINT}/api/v1/public/download/workflow/{public_token}"
)
context["recording_url"] = (
f"{base_url}/recording" if workflow_run.recording_url else None
)
context["transcript_url"] = (
f"{base_url}/transcript" if workflow_run.transcript_url else None
)
else:
context["recording_url"] = workflow_run.recording_url
context["transcript_url"] = workflow_run.transcript_url
return context
async def _execute_webhook_node(
webhook_data: WebhookNodeData,
render_context: Dict[str, Any],
organization_id: int,
) -> bool:
"""
Execute a single webhook node.
Args:
webhook_data: The validated webhook node data
render_context: Context for template rendering
organization_id: For credential lookup
Returns:
True if successful, False otherwise
"""
webhook_name = webhook_data.name
if not webhook_data.enabled:
logger.debug(f"Webhook '{webhook_name}' is disabled, skipping")
return True
url = webhook_data.endpoint_url
if not url:
logger.warning(f"Webhook '{webhook_name}' has no endpoint URL")
return False
headers = {"Content-Type": "application/json"}
credential_uuid = webhook_data.credential_uuid
if credential_uuid:
credential = await db_client.get_credential_by_uuid(
credential_uuid, organization_id
)
if credential:
auth_header = build_auth_header(credential)
headers.update(auth_header)
logger.debug(f"Applied credential '{credential.name}' to webhook")
else:
logger.warning(
f"Credential {credential_uuid} not found for webhook '{webhook_name}'"
)
for h in webhook_data.custom_headers or []:
if h.key and h.value:
headers[h.key] = h.value
payload = render_template(webhook_data.payload_template or {}, render_context)
method = (webhook_data.http_method or "POST").upper()
logger.info(f"Executing webhook '{webhook_name}': {method}")
try:
async with httpx.AsyncClient() as client:
if method in ("POST", "PUT", "PATCH"):
response = await client.request(
method=method,
url=url,
json=payload,
headers=headers,
timeout=30.0,
)
else: # GET, DELETE
response = await client.request(
method=method,
url=url,
headers=headers,
timeout=30.0,
)
response.raise_for_status()
logger.info(f"Webhook '{webhook_name}' succeeded: {response.status_code}")
return True
except httpx.HTTPStatusError as e:
logger.warning(
f"Webhook '{webhook_name}' failed: {e.response.status_code} - {e.response.text[:200]}"
)
return False
except httpx.RequestError as e:
logger.warning(f"Webhook '{webhook_name}' request error: {e}")
return False
except Exception as e:
logger.error(f"Webhook '{webhook_name}' unexpected error: {e}")
return False