dograh/api/tasks/run_integrations.py

228 lines
8 KiB
Python
Raw Normal View History

2025-09-09 14:37:32 +05:30
import os
import aiohttp
import httpx
from loguru import logger
from pipecat.utils.context import set_current_run_id
from api.db import db_client
from api.db.models import IntegrationModel
from api.enums import OrganizationConfigurationKey, WorkflowRunMode
from api.utils.template_renderer import render_template
async def run_integrations_post_workflow_run(ctx, workflow_run_id: int):
"""
Run integrations after a workflow run completes.
This function:
1. Gets the workflow run and its gathered_context
2. Determines the organization_id through the workflow -> user -> organization chain
3. Fetches all active integrations for that organization
4. For Slack integrations, sends the gathered_context to the webhook URL
Args:
workflow_run_id: The ID of the completed workflow run
"""
# Set the workflow_run_id in context variable for consistent logging format
set_current_run_id(workflow_run_id)
logger.info("Running integrations for workflow run")
try:
# Step 1: Get workflow run details with gathered_context using DB client
workflow_run, organization_id = await db_client.get_workflow_run_with_context(
workflow_run_id
)
if not workflow_run:
logger.error("Workflow run not found")
return
if not workflow_run.workflow:
logger.error("Workflow not found for workflow run")
return
if not workflow_run.workflow.user:
logger.error("User not found for workflow run")
return
gathered_context = workflow_run.gathered_context
initial_context = workflow_run.initial_context
if not gathered_context:
logger.info("No gathered context for workflow run, skipping integrations")
return
# Check if workflow run mode is stasis and sync with vendor
if workflow_run.mode == WorkflowRunMode.STASIS.value:
await _sync_vendor_data(initial_context, gathered_context)
# Step 2: Check if organization_id is available
if not organization_id:
logger.warning(
f"No organization found for workflow run, skipping integrations"
)
return
logger.debug(f"Found organization_id {organization_id} for workflow run")
# Step 3: Get all active integrations for the organization using DB client
integrations = await db_client.get_active_integrations_by_organization(
organization_id
)
logger.info(
f"Found {len(integrations)} active integrations for organization {organization_id}"
)
# Step 4: Process each integration
for integration in integrations:
await _process_integration(integration, gathered_context)
except Exception as e:
logger.error(f"Error running integrations for workflow run: {str(e)}")
raise
async def _sync_vendor_data(initial_context: dict, gathered_context: dict):
"""
Sync data with external vendor for stasis mode workflow runs.
Args:
initial_context: The initial context containing lead_id
gathered_context: The gathered context containing mapped_call_disposition
"""
if not os.getenv("ARI_DATA_SYNCING_URI"):
logger.info("ARI_DATA_SYNCING_URI not configured, skipping vendor sync")
return
try:
lead_id = initial_context.get("lead_id")
status = gathered_context.get("mapped_call_disposition")
if lead_id and status:
ari_data_uri = os.getenv("ARI_DATA_SYNCING_URI")
# Add URL params to the base URL
sync_url = f"{ari_data_uri}&lead_id={lead_id}&status={status}"
async with httpx.AsyncClient() as client:
response = await client.post(sync_url, timeout=10.0)
response.raise_for_status()
logger.info(
f"Successfully synced data for lead_id: {lead_id} with status: {status}"
)
else:
logger.warning(
f"Missing lead_id or status for syncing - lead_id: {lead_id}, status: {status}"
)
except Exception as e:
logger.error(f"Failed to sync data to ARI_DATA_SYNCING_URI: {e}")
async def _process_integration(
integration: IntegrationModel,
gathered_context: dict,
):
"""
Process a single integration.
Args:
integration: The integration model
gathered_context: The gathered context from the workflow run
workflow_run_name: Name of the workflow run
run_id: The workflow run ID for logging context
"""
logger.info(
f"Processing integration {integration.id} (provider: {integration.provider})"
)
try:
if integration.provider.lower() == "slack":
await _process_slack_integration(integration, gathered_context)
else:
logger.info(
f"Integration provider '{integration.provider}' not supported yet"
)
except Exception as e:
logger.error(f"Error processing integration {integration.id}: {str(e)}")
async def _process_slack_integration(
integration: IntegrationModel, gathered_context: dict
):
"""
Process a Slack integration by sending gathered_context to the webhook.
Args:
integration: The Slack integration model
gathered_context: The gathered context from the workflow run
workflow_run_name: Name of the workflow run
run_id: The workflow run ID for logging context
"""
logger.info(f"Processing Slack integration {integration.id}")
# TODO: Generalise this, currently tailored to Kapil's use case
if gathered_context.get("mapped_call_disposition") != "XFER":
logger.debug(
f"Not sending message on slack since not XFER: {gathered_context.get('mapped_call_disposition')}"
)
return
try:
# Extract webhook URL from connection_details
connection_details = integration.connection_details
if not connection_details:
logger.error(
f"No connection details found for Slack integration {integration.id}"
)
return
# Navigate to incoming_webhook.url in the connection_details
webhook_url = connection_details.get("connection_config", {}).get(
"incoming_webhook.url"
)
if not webhook_url:
logger.error(
f"No incoming_webhook found in connection details for integration {integration.id}"
)
return
logger.info(f"Found Slack webhook URL for integration {integration.id}")
# Get message template configuration
# Get organization_id from the integration model
organization_id = integration.organisation_id
message_templates = await db_client.get_configuration_value(
organization_id,
OrganizationConfigurationKey.DISPOSITION_MESSAGE_TEMPLATE.value,
default={},
)
# Check if there's a custom template for Slack
slack_template = message_templates.get("slack", {})
rendered_text = render_template(slack_template, gathered_context)
slack_message = {"text": rendered_text}
# Send to Slack webhook
async with aiohttp.ClientSession() as session:
async with session.post(
webhook_url,
json=slack_message,
headers={"Content-Type": "application/json"},
) as response:
if response.status == 200:
logger.info(
f"Successfully sent message to Slack for integration {integration.id}"
)
else:
error_text = await response.text()
logger.error(
f"Failed to send Slack message for integration {integration.id}: {response.status} - {error_text}"
)
except Exception as e:
logger.error(f"Error processing Slack integration {integration.id}: {str(e)}")