dograh/api/tasks/run_integrations.py

228 lines
7.6 KiB
Python
Raw Normal View History

"""Execute webhook integrations after workflow run completion."""
import base64
from typing import Any, Dict
2025-09-09 14:37:32 +05:30
import httpx
from loguru import logger
from api.db import db_client
from api.db.models import ExternalCredentialModel, WorkflowRunModel
2025-09-09 14:37:32 +05:30
from api.utils.template_renderer import render_template
from pipecat.utils.context import set_current_run_id
2025-09-09 14:37:32 +05:30
async def run_integrations_post_workflow_run(_ctx, workflow_run_id: int):
2025-09-09 14:37:32 +05:30
"""
Run webhook integrations after a workflow run completes.
2025-09-09 14:37:32 +05:30
This function:
1. Gets the workflow run and its contexts
2. Extracts webhook nodes from workflow definition
3. Executes each enabled webhook node
2025-09-09 14:37:32 +05:30
"""
set_current_run_id(workflow_run_id)
logger.info("Running webhook integrations for workflow run")
2025-09-09 14:37:32 +05:30
try:
# Step 1: Get workflow run with full context
2025-09-09 14:37:32 +05:30
workflow_run, organization_id = await db_client.get_workflow_run_with_context(
workflow_run_id
)
if not workflow_run or not workflow_run.workflow:
logger.error("Workflow run or workflow not found")
2025-09-09 14:37:32 +05:30
return
if not organization_id:
logger.warning("No organization found, skipping webhooks")
2025-09-09 14:37:32 +05:30
return
# Step 2: Get workflow definition
workflow_definition = workflow_run.workflow.workflow_definition_with_fallback
if not workflow_definition:
logger.debug("No workflow definition, skipping webhooks")
2025-09-09 14:37:32 +05:30
return
# Step 3: Extract webhook nodes
nodes = workflow_definition.get("nodes", [])
webhook_nodes = [n for n in nodes if n.get("type") == "webhook"]
2025-09-09 14:37:32 +05:30
if not webhook_nodes:
logger.debug("No webhook nodes in workflow")
2025-09-09 14:37:32 +05:30
return
logger.info(f"Found {len(webhook_nodes)} webhook nodes to execute")
2025-09-09 14:37:32 +05:30
# Step 4: Build render context
render_context = _build_render_context(workflow_run)
2025-09-09 14:37:32 +05:30
# Step 5: Execute each webhook node
for node in webhook_nodes:
webhook_data = node.get("data", {})
try:
await _execute_webhook_node(
webhook_data=webhook_data,
render_context=render_context,
organization_id=organization_id,
)
except Exception as e:
# Log error but continue with other webhooks
logger.error(
f"Failed to execute webhook '{webhook_data.get('name', 'unknown')}': {e}"
)
2025-09-09 14:37:32 +05:30
except Exception as e:
logger.error(f"Error running webhook integrations: {e}", exc_info=True)
2025-09-09 14:37:32 +05:30
raise
def _build_render_context(workflow_run: WorkflowRunModel) -> Dict[str, Any]:
"""Build the context dict for template rendering."""
return {
# Top-level fields
"workflow_run_id": workflow_run.id,
"workflow_run_name": workflow_run.name,
"workflow_id": workflow_run.workflow_id,
"workflow_name": workflow_run.workflow.name if workflow_run.workflow else None,
# Nested contexts
"initial_context": workflow_run.initial_context or {},
"gathered_context": workflow_run.gathered_context or {},
"cost_info": workflow_run.usage_info or {},
"recording_url": getattr(workflow_run, "recording_url", None),
"transcript_url": getattr(workflow_run, "transcript_url", None),
}
async def _execute_webhook_node(
webhook_data: Dict[str, Any],
render_context: Dict[str, Any],
organization_id: int,
) -> bool:
2025-09-09 14:37:32 +05:30
"""
Execute a single webhook node.
2025-09-09 14:37:32 +05:30
Args:
webhook_data: The webhook node's data dict from workflow definition
render_context: Context for template rendering
organization_id: For credential lookup
2025-09-09 14:37:32 +05:30
Returns:
True if successful, False otherwise
2025-09-09 14:37:32 +05:30
"""
webhook_name = webhook_data.get("name", "Unnamed Webhook")
# 1. Check if enabled
if not webhook_data.get("enabled", True):
logger.debug(f"Webhook '{webhook_name}' is disabled, skipping")
return True
# 2. Validate endpoint URL
url = webhook_data.get("endpoint_url")
if not url:
logger.error(f"Webhook '{webhook_name}' has no endpoint URL")
return False
# 3. Build headers
headers = {"Content-Type": "application/json"}
# 4. Add auth header if credential configured
credential_uuid = webhook_data.get("credential_uuid")
if credential_uuid:
credential = await db_client.get_credential_by_uuid(
credential_uuid, organization_id
)
if credential:
auth_header = _build_auth_header(credential)
headers.update(auth_header)
logger.debug(f"Applied credential '{credential.name}' to webhook")
2025-09-09 14:37:32 +05:30
else:
logger.warning(
f"Credential {credential_uuid} not found for webhook '{webhook_name}'"
2025-09-09 14:37:32 +05:30
)
# 5. Add custom headers
custom_headers = webhook_data.get("custom_headers", [])
for h in custom_headers:
if h.get("key") and h.get("value"):
headers[h["key"]] = h["value"]
2025-09-09 14:37:32 +05:30
# 6. Render payload template
payload_template = webhook_data.get("payload_template", {})
payload = render_template(payload_template, render_context)
2025-09-09 14:37:32 +05:30
# 7. Make HTTP request
method = webhook_data.get("http_method", "POST").upper()
2025-09-09 14:37:32 +05:30
logger.info(f"Executing webhook '{webhook_name}': {method}")
2025-09-09 14:37:32 +05:30
try:
async with httpx.AsyncClient() as client:
if method in ("POST", "PUT", "PATCH"):
response = await client.request(
method=method,
url=url,
json=payload,
headers=headers,
timeout=30.0,
)
else: # GET, DELETE
response = await client.request(
method=method,
url=url,
headers=headers,
timeout=30.0,
)
2025-09-09 14:37:32 +05:30
response.raise_for_status()
logger.info(f"Webhook '{webhook_name}' succeeded: {response.status_code}")
return True
2025-09-09 14:37:32 +05:30
except httpx.HTTPStatusError as e:
logger.error(
f"Webhook '{webhook_name}' failed: {e.response.status_code} - {e.response.text[:200]}"
2025-09-09 14:37:32 +05:30
)
return False
except httpx.RequestError as e:
logger.error(f"Webhook '{webhook_name}' request error: {e}")
return False
except Exception as e:
logger.error(f"Webhook '{webhook_name}' unexpected error: {e}")
return False
2025-09-09 14:37:32 +05:30
def _build_auth_header(credential: ExternalCredentialModel) -> Dict[str, str]:
"""
Build authentication header based on credential type.
2025-09-09 14:37:32 +05:30
Args:
credential: The credential model
2025-09-09 14:37:32 +05:30
Returns:
Dict with header name and value
"""
cred_type = credential.credential_type
cred_data = credential.credential_data or {}
if cred_type == "bearer_token":
token = cred_data.get("token", "")
return {"Authorization": f"Bearer {token}"}
elif cred_type == "api_key":
header_name = cred_data.get("header_name", "X-API-Key")
api_key = cred_data.get("api_key", "")
return {header_name: api_key}
elif cred_type == "basic_auth":
username = cred_data.get("username", "")
password = cred_data.get("password", "")
encoded = base64.b64encode(f"{username}:{password}".encode()).decode()
return {"Authorization": f"Basic {encoded}"}
elif cred_type == "custom_header":
header_name = cred_data.get("header_name", "X-Custom")
header_value = cred_data.get("header_value", "")
return {header_name: header_value}
return {}