mirror of
https://github.com/dograh-hq/dograh.git
synced 2026-06-25 08:48:13 +02:00
Feat/inbound telephony (#113)
* feat: inbound telephony (twilio & vobiz) * chore: add ruff and lint formatting * fix: add missing cloudonix interface compliance implementation
This commit is contained in:
parent
b79bc4221d
commit
97fbd9b37b
22 changed files with 1998 additions and 40 deletions
|
|
@ -1,27 +1,42 @@
|
|||
"""
|
||||
Generic telephony routes that work with any telephony provider.
|
||||
Telephony routes - handles all telephony-related endpoints.
|
||||
Consolidated from split modules for easier maintenance.
|
||||
"""
|
||||
|
||||
import json
|
||||
import random
|
||||
import uuid
|
||||
from datetime import UTC, datetime
|
||||
from typing import Optional
|
||||
|
||||
from fastapi import APIRouter, Depends, Header, HTTPException, Request, WebSocket
|
||||
from loguru import logger
|
||||
from pipecat.utils.context import set_current_run_id
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy.future import select
|
||||
from starlette.responses import HTMLResponse
|
||||
|
||||
from api.db import db_client
|
||||
from api.db.models import UserModel
|
||||
from api.enums import WorkflowRunState
|
||||
from api.db.models import OrganizationConfigurationModel, UserModel
|
||||
from api.db.workflow_client import WorkflowClient
|
||||
from api.db.workflow_run_client import WorkflowRunClient
|
||||
from api.enums import CallType, OrganizationConfigurationKey, WorkflowRunState
|
||||
from api.errors.telephony_errors import TelephonyError
|
||||
from api.services.auth.depends import get_user
|
||||
from api.services.campaign.call_dispatcher import campaign_call_dispatcher
|
||||
from api.services.campaign.campaign_event_publisher import get_campaign_event_publisher
|
||||
from api.services.quota_service import check_dograh_quota
|
||||
from api.services.telephony.factory import get_telephony_provider
|
||||
from api.services.quota_service import check_dograh_quota, check_dograh_quota_by_user_id
|
||||
from api.services.telephony.factory import (
|
||||
get_all_telephony_providers,
|
||||
get_telephony_provider,
|
||||
)
|
||||
from api.utils.telephony_helper import (
|
||||
generic_hangup_response,
|
||||
normalize_webhook_data,
|
||||
numbers_match,
|
||||
parse_webhook_request,
|
||||
)
|
||||
from api.utils.tunnel import TunnelURLProvider
|
||||
from pipecat.utils.context import set_current_run_id
|
||||
|
||||
router = APIRouter(prefix="/telephony")
|
||||
|
||||
|
|
@ -122,15 +137,18 @@ async def initiate_call(
|
|||
workflow_run_id = request.workflow_run_id
|
||||
|
||||
if not workflow_run_id:
|
||||
workflow_run_name = f"WR-TEL-{random.randint(1000, 9999)}"
|
||||
numeric_suffix = int(str(uuid.uuid4()).replace("-", "")[:8], 16) % 100000000
|
||||
workflow_run_name = f"WR-TEL-OUT-{numeric_suffix:08d}"
|
||||
workflow_run = await db_client.create_workflow_run(
|
||||
workflow_run_name,
|
||||
request.workflow_id,
|
||||
workflow_run_mode,
|
||||
user_id=user.id,
|
||||
call_type=CallType.OUTBOUND,
|
||||
initial_context={
|
||||
"phone_number": phone_number,
|
||||
"provider": provider.PROVIDER_NAME,
|
||||
},
|
||||
user_id=user.id,
|
||||
)
|
||||
workflow_run_id = workflow_run.id
|
||||
else:
|
||||
|
|
@ -174,6 +192,255 @@ async def initiate_call(
|
|||
return {"message": f"Call initiated successfully with run name {workflow_run_name}"}
|
||||
|
||||
|
||||
async def _verify_organization_phone_number(
|
||||
phone_number: str,
|
||||
organization_id: int,
|
||||
to_country: str = None,
|
||||
from_country: str = None,
|
||||
) -> bool:
|
||||
"""
|
||||
Verify that a phone number belongs to the specified organization.
|
||||
|
||||
Args:
|
||||
phone_number: The phone number to verify
|
||||
organization_id: The organization ID to check against
|
||||
to_country: ISO country code for the called number (e.g., "US", "IN")
|
||||
from_country: ISO country code for the caller (e.g., "IN", "GB")
|
||||
|
||||
Returns:
|
||||
True if the phone number belongs to the organization, False otherwise
|
||||
"""
|
||||
try:
|
||||
async with db_client.async_session() as session:
|
||||
result = await session.execute(
|
||||
select(OrganizationConfigurationModel).where(
|
||||
OrganizationConfigurationModel.organization_id == organization_id,
|
||||
OrganizationConfigurationModel.key
|
||||
== OrganizationConfigurationKey.TELEPHONY_CONFIGURATION.value,
|
||||
)
|
||||
)
|
||||
|
||||
config = result.scalars().first()
|
||||
|
||||
if not config or not config.value:
|
||||
logger.warning(
|
||||
f"No telephony configuration found for organization {organization_id}"
|
||||
)
|
||||
return False
|
||||
|
||||
from_numbers = config.value.get("from_numbers", [])
|
||||
logger.debug(
|
||||
f"Organization {organization_id} has from_numbers: {from_numbers}"
|
||||
)
|
||||
|
||||
for configured_number in from_numbers:
|
||||
if numbers_match(
|
||||
phone_number, configured_number, to_country, from_country
|
||||
):
|
||||
logger.info(
|
||||
f"Phone number {phone_number} verified for organization {organization_id} "
|
||||
f"(matches {configured_number}, to_country={to_country}, from_country={from_country})"
|
||||
)
|
||||
return True
|
||||
|
||||
logger.warning(
|
||||
f"Phone number {phone_number} not found in organization {organization_id} from_numbers: {from_numbers} "
|
||||
f"(to_country={to_country}, from_country={from_country})"
|
||||
)
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Error verifying phone number {phone_number} for organization {organization_id}: {e}"
|
||||
)
|
||||
return False
|
||||
|
||||
|
||||
async def _detect_provider(webhook_data: dict, headers: dict):
|
||||
"""Detect which telephony provider can handle this webhook"""
|
||||
provider_classes = await get_all_telephony_providers()
|
||||
|
||||
for provider_class in provider_classes:
|
||||
if provider_class.can_handle_webhook(webhook_data, headers):
|
||||
return provider_class
|
||||
|
||||
logger.warning(f"No provider found for webhook data: {webhook_data.keys()}")
|
||||
return None
|
||||
|
||||
|
||||
async def _validate_inbound_request(
|
||||
workflow_id: int,
|
||||
provider_class,
|
||||
normalized_data,
|
||||
webhook_data: dict,
|
||||
webhook_body: str = "",
|
||||
x_twilio_signature: str = None,
|
||||
x_vobiz_signature: str = None,
|
||||
x_vobiz_timestamp: str = None,
|
||||
) -> tuple[bool, TelephonyError, dict, object]:
|
||||
"""
|
||||
Validate all aspects of inbound request.
|
||||
Returns: (is_valid, error_type, workflow_context, provider_instance)
|
||||
"""
|
||||
|
||||
workflow = await db_client.get_workflow(workflow_id)
|
||||
if not workflow:
|
||||
return False, TelephonyError.WORKFLOW_NOT_FOUND, {}, None
|
||||
|
||||
organization_id = workflow.organization_id
|
||||
user_id = workflow.user_id
|
||||
provider = normalized_data.provider
|
||||
|
||||
# Validate provider and account_id
|
||||
validation_result = await _validate_organization_provider_config(
|
||||
organization_id, provider_class, normalized_data.account_id
|
||||
)
|
||||
if validation_result != TelephonyError.VALID:
|
||||
return False, validation_result, {}, None
|
||||
|
||||
# Verify phone number belongs to organization
|
||||
is_valid = await _verify_organization_phone_number(
|
||||
normalized_data.to_number,
|
||||
organization_id,
|
||||
normalized_data.to_country,
|
||||
normalized_data.from_country,
|
||||
)
|
||||
if not is_valid:
|
||||
return False, TelephonyError.PHONE_NUMBER_NOT_CONFIGURED, {}, None
|
||||
|
||||
# Verify webhook signature if provided
|
||||
provider_instance = None
|
||||
if x_twilio_signature or x_vobiz_signature:
|
||||
backend_endpoint = await TunnelURLProvider.get_tunnel_url()
|
||||
webhook_url = (
|
||||
f"https://{backend_endpoint}/api/v1/telephony/inbound/{workflow_id}"
|
||||
)
|
||||
|
||||
# Get the real telephony provider with actual credentials for signature verification
|
||||
provider_instance = await get_telephony_provider(organization_id)
|
||||
|
||||
if provider_class.PROVIDER_NAME == "twilio" and x_twilio_signature:
|
||||
logger.info(f"Verifying Twilio signature for URL: {webhook_url}")
|
||||
signature_valid = await provider_instance.verify_inbound_signature(
|
||||
webhook_url, webhook_data, x_twilio_signature
|
||||
)
|
||||
elif provider_class.PROVIDER_NAME == "vobiz" and x_vobiz_signature:
|
||||
logger.info(f"Verifying Vobiz signature for URL: {webhook_url}")
|
||||
signature_valid = await provider_instance.verify_inbound_signature(
|
||||
webhook_url,
|
||||
webhook_data,
|
||||
x_vobiz_signature,
|
||||
x_vobiz_timestamp,
|
||||
webhook_body,
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
f"No signature validation for provider {provider_class.PROVIDER_NAME}"
|
||||
)
|
||||
signature_valid = True
|
||||
|
||||
logger.info(f"Signature validation result: {signature_valid}")
|
||||
if not signature_valid:
|
||||
return (
|
||||
False,
|
||||
TelephonyError.SIGNATURE_VALIDATION_FAILED,
|
||||
{},
|
||||
provider_instance,
|
||||
)
|
||||
|
||||
# Return success with workflow context
|
||||
workflow_context = {
|
||||
"workflow": workflow,
|
||||
"organization_id": organization_id,
|
||||
"user_id": user_id,
|
||||
"provider": provider,
|
||||
}
|
||||
return (
|
||||
True,
|
||||
"",
|
||||
workflow_context,
|
||||
provider_instance,
|
||||
) # TODO: do we still need instance in the client code
|
||||
|
||||
|
||||
async def _create_inbound_workflow_run(
|
||||
workflow_id: int, user_id: int, provider: str, normalized_data, data_source: str
|
||||
) -> int:
|
||||
"""Create workflow run for inbound call and return run ID"""
|
||||
call_id = normalized_data.call_id
|
||||
numeric_suffix = int(str(uuid.uuid4()).replace("-", "")[:8], 16) % 100000000
|
||||
workflow_run_name = f"WR-TEL-IN-{numeric_suffix:08d}"
|
||||
|
||||
workflow_run = await db_client.create_workflow_run(
|
||||
workflow_run_name,
|
||||
workflow_id,
|
||||
provider, # Use detected provider as mode
|
||||
user_id=user_id,
|
||||
call_type=CallType.INBOUND,
|
||||
initial_context={
|
||||
"caller_number": normalized_data.from_number,
|
||||
"called_number": normalized_data.to_number,
|
||||
"direction": "inbound",
|
||||
"call_id": call_id,
|
||||
"account_id": normalized_data.account_id,
|
||||
"provider": provider,
|
||||
"data_source": data_source,
|
||||
"from_country": normalized_data.from_country,
|
||||
"to_country": normalized_data.to_country,
|
||||
"raw_webhook_data": normalized_data.raw_data,
|
||||
},
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Created inbound workflow run {workflow_run.id} for {provider} call {call_id}"
|
||||
)
|
||||
return workflow_run.id
|
||||
|
||||
|
||||
async def _validate_organization_provider_config(
|
||||
organization_id: int, provider_class, account_id: str
|
||||
) -> TelephonyError:
|
||||
"""Validate provider and account_id, returning specific error type"""
|
||||
if not account_id:
|
||||
logger.warning(
|
||||
f"No account_id provided for provider {provider_class.PROVIDER_NAME}"
|
||||
)
|
||||
return TelephonyError.ACCOUNT_VALIDATION_FAILED
|
||||
|
||||
try:
|
||||
config = await db_client.get_configuration(
|
||||
organization_id,
|
||||
OrganizationConfigurationKey.TELEPHONY_CONFIGURATION.value,
|
||||
)
|
||||
|
||||
if not config or not config.value:
|
||||
logger.warning(
|
||||
f"No telephony configuration found for organization {organization_id}"
|
||||
)
|
||||
return TelephonyError.ACCOUNT_VALIDATION_FAILED
|
||||
|
||||
stored_provider = config.value.get("provider")
|
||||
if stored_provider != provider_class.PROVIDER_NAME:
|
||||
logger.warning(
|
||||
f"Provider mismatch: webhook={provider_class.PROVIDER_NAME}, config={stored_provider}"
|
||||
)
|
||||
return TelephonyError.PROVIDER_MISMATCH
|
||||
|
||||
# Use provider-specific validation
|
||||
is_valid = provider_class.validate_account_id(config.value, account_id)
|
||||
if not is_valid:
|
||||
logger.warning(
|
||||
f"Account validation failed for {provider_class.PROVIDER_NAME}: webhook={account_id}"
|
||||
)
|
||||
return TelephonyError.ACCOUNT_VALIDATION_FAILED
|
||||
|
||||
return TelephonyError.VALID
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Exception during account validation: {e}")
|
||||
return TelephonyError.ACCOUNT_VALIDATION_FAILED
|
||||
|
||||
|
||||
@router.post("/twiml", include_in_schema=False)
|
||||
async def handle_twiml_webhook(
|
||||
workflow_id: int, user_id: int, workflow_run_id: int, organization_id: int
|
||||
|
|
@ -250,8 +517,14 @@ async def websocket_endpoint(
|
|||
|
||||
# Extract provider type from workflow run context
|
||||
provider_type = None
|
||||
if workflow_run.gathered_context:
|
||||
provider_type = workflow_run.gathered_context.get("provider")
|
||||
logger.info(
|
||||
f"Workflow run {workflow_run_id} gathered_context: {workflow_run.gathered_context}"
|
||||
)
|
||||
logger.info(f"Workflow run {workflow_run_id} mode: {workflow_run.mode}")
|
||||
|
||||
if workflow_run.initial_context:
|
||||
provider_type = workflow_run.initial_context.get("provider")
|
||||
logger.info(f"Extracted provider_type: {provider_type}")
|
||||
|
||||
if not provider_type:
|
||||
logger.error(
|
||||
|
|
@ -546,21 +819,78 @@ async def handle_vobiz_xml_webhook(
|
|||
async def handle_vobiz_hangup_callback(
|
||||
workflow_run_id: int,
|
||||
request: Request,
|
||||
x_vobiz_signature: Optional[str] = Header(None),
|
||||
x_vobiz_timestamp: Optional[str] = Header(None),
|
||||
):
|
||||
"""Handle Vobiz hangup callback (sent when call ends).
|
||||
|
||||
Vobiz sends callbacks to hangup_url when the call terminates.
|
||||
This includes call duration, status, and billing information.
|
||||
"""
|
||||
# TODO: Remove this debug logging after Vobiz team clarifies webhook authentication
|
||||
# Logging all headers and body to understand what Vobiz actually sends
|
||||
all_headers = dict(request.headers)
|
||||
logger.info(
|
||||
f"[run {workflow_run_id}] Vobiz hangup callback - Headers: {json.dumps(all_headers)}"
|
||||
)
|
||||
|
||||
# Parse the callback data (Vobiz sends form data or JSON)
|
||||
form_data = await request.form()
|
||||
callback_data = dict(form_data)
|
||||
|
||||
# TODO: Remove this debug logging after Vobiz team clarifies webhook authentication
|
||||
logger.info(
|
||||
f"[run {workflow_run_id}] Vobiz hangup callback - Body: {json.dumps(callback_data)}"
|
||||
)
|
||||
logger.info(
|
||||
f"[run {workflow_run_id}] Received Vobiz hangup callback {json.dumps(callback_data)}"
|
||||
)
|
||||
|
||||
# Get workflow run for processing
|
||||
workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
|
||||
# Verify signature if provided
|
||||
if x_vobiz_signature:
|
||||
# We need the workflow run to get organization for provider credentials
|
||||
workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
|
||||
if not workflow_run:
|
||||
logger.warning(
|
||||
f"[run {workflow_run_id}] Workflow run not found for signature verification"
|
||||
)
|
||||
return {"status": "error", "reason": "workflow_run_not_found"}
|
||||
|
||||
workflow = await db_client.get_workflow_by_id(workflow_run.workflow_id)
|
||||
if not workflow:
|
||||
logger.warning(
|
||||
f"[run {workflow_run_id}] Workflow not found for signature verification"
|
||||
)
|
||||
return {"status": "error", "reason": "workflow_not_found"}
|
||||
|
||||
provider = await get_telephony_provider(workflow.organization_id)
|
||||
|
||||
# Get raw body for signature verification
|
||||
raw_body = await request.body()
|
||||
webhook_body = raw_body.decode("utf-8")
|
||||
|
||||
# Verify signature
|
||||
backend_endpoint = await TunnelURLProvider.get_tunnel_url()
|
||||
webhook_url = f"https://{backend_endpoint}/api/v1/telephony/vobiz/hangup-callback/{workflow_run_id}"
|
||||
|
||||
is_valid = await provider.verify_webhook_signature(
|
||||
webhook_url,
|
||||
callback_data,
|
||||
x_vobiz_signature,
|
||||
x_vobiz_timestamp,
|
||||
webhook_body,
|
||||
)
|
||||
|
||||
if not is_valid:
|
||||
logger.warning(
|
||||
f"[run {workflow_run_id}] Invalid Vobiz hangup callback signature"
|
||||
)
|
||||
return {"status": "error", "reason": "invalid_signature"}
|
||||
|
||||
logger.info(f"[run {workflow_run_id}] Vobiz hangup callback signature verified")
|
||||
else:
|
||||
# Get workflow run for processing (signature verification already got it if needed)
|
||||
workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
|
||||
if not workflow_run:
|
||||
logger.warning(
|
||||
f"[run {workflow_run_id}] Workflow run not found for Vobiz hangup callback"
|
||||
|
|
@ -609,21 +939,79 @@ async def handle_vobiz_hangup_callback(
|
|||
async def handle_vobiz_ring_callback(
|
||||
workflow_run_id: int,
|
||||
request: Request,
|
||||
x_vobiz_signature: Optional[str] = Header(None),
|
||||
x_vobiz_timestamp: Optional[str] = Header(None),
|
||||
):
|
||||
"""Handle Vobiz ring callback (sent when call starts ringing).
|
||||
|
||||
Vobiz can send callbacks to ring_url when the call starts ringing.
|
||||
This is optional and used for tracking ringing status.
|
||||
"""
|
||||
# TODO: Remove this debug logging after Vobiz team clarifies webhook authentication
|
||||
# Logging all headers and body to understand what Vobiz actually sends
|
||||
all_headers = dict(request.headers)
|
||||
logger.info(
|
||||
f"[run {workflow_run_id}] Vobiz ring callback - Headers: {json.dumps(all_headers)}"
|
||||
)
|
||||
|
||||
# Parse the callback data
|
||||
form_data = await request.form()
|
||||
callback_data = dict(form_data)
|
||||
|
||||
# TODO: Remove this debug logging after Vobiz team clarifies webhook authentication
|
||||
logger.info(
|
||||
f"[run {workflow_run_id}] Vobiz ring callback - Body: {json.dumps(callback_data)}"
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"[run {workflow_run_id}] Received Vobiz ring callback {json.dumps(callback_data)}"
|
||||
)
|
||||
|
||||
# Get workflow run for processing
|
||||
workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
|
||||
# Verify signature if provided
|
||||
if x_vobiz_signature:
|
||||
# We need the workflow run to get organization for provider credentials
|
||||
workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
|
||||
if not workflow_run:
|
||||
logger.warning(
|
||||
f"[run {workflow_run_id}] Workflow run not found for signature verification"
|
||||
)
|
||||
return {"status": "error", "reason": "workflow_run_not_found"}
|
||||
|
||||
workflow = await db_client.get_workflow_by_id(workflow_run.workflow_id)
|
||||
if not workflow:
|
||||
logger.warning(
|
||||
f"[run {workflow_run_id}] Workflow not found for signature verification"
|
||||
)
|
||||
return {"status": "error", "reason": "workflow_not_found"}
|
||||
|
||||
provider = await get_telephony_provider(workflow.organization_id)
|
||||
|
||||
# Get raw body for signature verification
|
||||
raw_body = await request.body()
|
||||
webhook_body = raw_body.decode("utf-8")
|
||||
|
||||
# Verify signature
|
||||
backend_endpoint = await TunnelURLProvider.get_tunnel_url()
|
||||
webhook_url = f"https://{backend_endpoint}/api/v1/telephony/vobiz/ring-callback/{workflow_run_id}"
|
||||
|
||||
is_valid = await provider.verify_webhook_signature(
|
||||
webhook_url,
|
||||
callback_data,
|
||||
x_vobiz_signature,
|
||||
x_vobiz_timestamp,
|
||||
webhook_body,
|
||||
)
|
||||
|
||||
if not is_valid:
|
||||
logger.warning(
|
||||
f"[run {workflow_run_id}] Invalid Vobiz ring callback signature"
|
||||
)
|
||||
return {"status": "error", "reason": "invalid_signature"}
|
||||
|
||||
logger.info(f"[run {workflow_run_id}] Vobiz ring callback signature verified")
|
||||
else:
|
||||
# Get workflow run for processing (signature verification already got it if needed)
|
||||
workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
|
||||
if not workflow_run:
|
||||
logger.warning(
|
||||
f"[run {workflow_run_id}] Workflow run not found for Vobiz ring callback"
|
||||
|
|
@ -707,3 +1095,272 @@ async def handle_cloudonix_status_callback(
|
|||
await _process_status_update(workflow_run_id, status_update, workflow_run)
|
||||
|
||||
return {"status": "success"}
|
||||
|
||||
|
||||
@router.post("/vobiz/hangup-callback/workflow/{workflow_id}")
|
||||
async def handle_vobiz_hangup_callback_by_workflow(
|
||||
workflow_id: int,
|
||||
request: Request,
|
||||
x_vobiz_signature: Optional[str] = Header(None),
|
||||
x_vobiz_timestamp: Optional[str] = Header(None),
|
||||
):
|
||||
"""Handle Vobiz hangup callback with workflow_id - finds workflow run by call_id."""
|
||||
|
||||
all_headers = dict(request.headers)
|
||||
logger.info(
|
||||
f"[workflow {workflow_id}] Vobiz hangup callback - Headers: {json.dumps(all_headers)}"
|
||||
)
|
||||
|
||||
try:
|
||||
callback_data, _ = await parse_webhook_request(request)
|
||||
except ValueError:
|
||||
callback_data = {}
|
||||
|
||||
call_uuid = callback_data.get("CallUUID") or callback_data.get("call_uuid")
|
||||
logger.info(
|
||||
f"[workflow {workflow_id}] Received Vobiz hangup callback for call {call_uuid}: {json.dumps(callback_data)}"
|
||||
)
|
||||
|
||||
if not call_uuid:
|
||||
logger.warning(
|
||||
f"[workflow {workflow_id}] No call_uuid found in Vobiz hangup callback"
|
||||
)
|
||||
return {"status": "error", "message": "No call_uuid found"}
|
||||
|
||||
workflow_client = WorkflowClient()
|
||||
workflow = await workflow_client.get_workflow_by_id(workflow_id)
|
||||
if not workflow:
|
||||
logger.warning(f"[workflow {workflow_id}] Workflow not found")
|
||||
return {"status": "error", "message": "workflow_not_found"}
|
||||
|
||||
provider = await get_telephony_provider(workflow.organization_id)
|
||||
|
||||
if x_vobiz_signature:
|
||||
raw_body = await request.body()
|
||||
webhook_body = raw_body.decode("utf-8")
|
||||
backend_endpoint = await TunnelURLProvider.get_tunnel_url()
|
||||
webhook_url = f"https://{backend_endpoint}/api/v1/telephony/vobiz/hangup-callback/workflow/{workflow_id}"
|
||||
|
||||
is_valid = await provider.verify_webhook_signature(
|
||||
webhook_url,
|
||||
callback_data,
|
||||
x_vobiz_signature,
|
||||
x_vobiz_timestamp,
|
||||
webhook_body,
|
||||
)
|
||||
|
||||
if not is_valid:
|
||||
logger.warning(
|
||||
f"[workflow {workflow_id}] Invalid Vobiz hangup callback signature"
|
||||
)
|
||||
return {"status": "error", "message": "invalid_signature"}
|
||||
|
||||
logger.info(
|
||||
f"[workflow {workflow_id}] Vobiz hangup callback signature verified"
|
||||
)
|
||||
|
||||
try:
|
||||
db_client = WorkflowRunClient()
|
||||
async with db_client.async_session() as session:
|
||||
# Fetch workflow run with matching call_id in initial_context
|
||||
query = text("""
|
||||
SELECT id FROM workflow_runs
|
||||
WHERE workflow_id = :workflow_id
|
||||
AND CAST(initial_context AS jsonb) @> CAST(:call_id_json AS jsonb)
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 1
|
||||
""")
|
||||
|
||||
result = await session.execute(
|
||||
query,
|
||||
{
|
||||
"workflow_id": workflow_id,
|
||||
"call_id_json": json.dumps({"call_id": call_uuid}),
|
||||
},
|
||||
)
|
||||
workflow_run_row = result.fetchone()
|
||||
|
||||
if not workflow_run_row:
|
||||
logger.warning(
|
||||
f"[workflow {workflow_id}] No workflow run found for call {call_uuid}"
|
||||
)
|
||||
return {"status": "ignored", "reason": "workflow_run_not_found"}
|
||||
|
||||
workflow_run_id = workflow_run_row[0]
|
||||
logger.info(
|
||||
f"[workflow {workflow_id}] Found workflow run {workflow_run_id} for call {call_uuid}"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"[workflow {workflow_id}] Error finding workflow run for call {call_uuid}: {e}"
|
||||
)
|
||||
return {"status": "error", "message": str(e)}
|
||||
|
||||
try:
|
||||
workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
|
||||
if not workflow_run:
|
||||
logger.warning(f"[run {workflow_run_id}] Workflow run not found")
|
||||
return {"status": "ignored", "reason": "workflow_run_not_found"}
|
||||
|
||||
parsed_data = provider.parse_status_callback(callback_data)
|
||||
|
||||
status = StatusCallbackRequest(
|
||||
call_id=parsed_data["call_id"],
|
||||
status=parsed_data["status"],
|
||||
from_number=parsed_data.get("from_number"),
|
||||
to_number=parsed_data.get("to_number"),
|
||||
direction=parsed_data.get("direction"),
|
||||
duration=parsed_data.get("duration"),
|
||||
extra=parsed_data.get("extra", {}),
|
||||
)
|
||||
|
||||
await _process_status_update(workflow_run_id, status, workflow_run)
|
||||
|
||||
logger.info(
|
||||
f"[run {workflow_run_id}] Vobiz hangup callback processed successfully"
|
||||
)
|
||||
return {"status": "success"}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"[run {workflow_run_id}] Error processing Vobiz hangup callback: {e}"
|
||||
)
|
||||
return {"status": "error", "message": str(e)}
|
||||
|
||||
|
||||
@router.post("/inbound/{workflow_id}")
|
||||
async def handle_inbound_telephony(
|
||||
workflow_id: int,
|
||||
request: Request,
|
||||
x_twilio_signature: Optional[str] = Header(None),
|
||||
x_vobiz_signature: Optional[str] = Header(None),
|
||||
x_vobiz_timestamp: Optional[str] = Header(None),
|
||||
):
|
||||
"""Handle inbound telephony calls from any supported provider with common processing"""
|
||||
logger.info(f"Inbound call received for workflow_id: {workflow_id}")
|
||||
|
||||
try:
|
||||
webhook_data, data_source = await parse_webhook_request(request)
|
||||
headers = dict(request.headers)
|
||||
|
||||
# Detect provider and normalize data
|
||||
provider_class = await _detect_provider(webhook_data, headers)
|
||||
if not provider_class:
|
||||
logger.error("Unable to detect provider for webhook")
|
||||
return generic_hangup_response()
|
||||
|
||||
normalized_data = normalize_webhook_data(provider_class, webhook_data)
|
||||
|
||||
logger.info(
|
||||
f"Inbound call - Provider: {normalized_data.provider}, Data source: {data_source}"
|
||||
)
|
||||
logger.info(f"Normalized data: {normalized_data}")
|
||||
|
||||
# Validate inbound direction
|
||||
if normalized_data.direction != "inbound":
|
||||
logger.warning(f"Non-inbound call received: {normalized_data.direction}")
|
||||
return generic_hangup_response()
|
||||
|
||||
logger.info(f"Inbound call headers: {dict(request.headers)}")
|
||||
logger.info(f"Twilio signature header: {x_twilio_signature}")
|
||||
logger.info(f"Vobiz signature header: {x_vobiz_signature}")
|
||||
logger.info(f"Vobiz timestamp header: {x_vobiz_timestamp}")
|
||||
|
||||
webhook_body = ""
|
||||
if provider_class.PROVIDER_NAME == "vobiz":
|
||||
webhook_body = data_source
|
||||
logger.info(f"Vobiz inbound call - Body: {json.dumps(webhook_data)}")
|
||||
|
||||
(
|
||||
is_valid,
|
||||
error_type,
|
||||
workflow_context,
|
||||
provider_instance,
|
||||
) = await _validate_inbound_request(
|
||||
workflow_id,
|
||||
provider_class,
|
||||
normalized_data,
|
||||
webhook_data,
|
||||
webhook_body,
|
||||
x_twilio_signature,
|
||||
x_vobiz_signature,
|
||||
x_vobiz_timestamp,
|
||||
)
|
||||
|
||||
if not is_valid:
|
||||
logger.error(f"Request validation failed: {error_type}")
|
||||
return provider_class.generate_validation_error_response(error_type)
|
||||
|
||||
# Check quota before processing
|
||||
user_id = workflow_context["user_id"]
|
||||
quota_result = await check_dograh_quota_by_user_id(user_id)
|
||||
if not quota_result.has_quota:
|
||||
logger.warning(
|
||||
f"User {user_id} has exceeded quota for inbound calls: {quota_result.error_message}"
|
||||
)
|
||||
return provider_class.generate_validation_error_response(
|
||||
TelephonyError.QUOTA_EXCEEDED
|
||||
)
|
||||
|
||||
# Create workflow run
|
||||
workflow_run_id = await _create_inbound_workflow_run(
|
||||
workflow_id,
|
||||
workflow_context["user_id"],
|
||||
workflow_context["provider"],
|
||||
normalized_data,
|
||||
data_source,
|
||||
)
|
||||
|
||||
# Generate response URLs
|
||||
backend_endpoint = await TunnelURLProvider.get_tunnel_url()
|
||||
websocket_url = f"wss://{backend_endpoint}/api/v1/telephony/ws/{workflow_id}/{workflow_context['user_id']}/{workflow_run_id}"
|
||||
response = await provider_class.generate_inbound_response(
|
||||
websocket_url, workflow_run_id
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Generated {normalized_data.provider} response for call {normalized_data.call_id}"
|
||||
)
|
||||
return response
|
||||
|
||||
except ValueError as e:
|
||||
logger.error(f"Request parsing error: {e}")
|
||||
return generic_hangup_response()
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing inbound call: {e}")
|
||||
return generic_hangup_response()
|
||||
|
||||
|
||||
@router.post("/inbound/fallback")
|
||||
async def handle_inbound_fallback(request: Request):
|
||||
"""Fallback endpoint that returns audio message when calls cannot be processed."""
|
||||
|
||||
webhook_data, _ = await parse_webhook_request(request)
|
||||
headers = dict(request.headers)
|
||||
|
||||
# Detect provider
|
||||
provider_class = await _detect_provider(webhook_data, headers)
|
||||
|
||||
if provider_class:
|
||||
# Use provider-specific error response
|
||||
call_id = (
|
||||
webhook_data.get("CallSid")
|
||||
or webhook_data.get("CallUUID")
|
||||
or webhook_data.get("call_uuid")
|
||||
)
|
||||
logger.info(
|
||||
f"[fallback] Received {provider_class.PROVIDER_NAME} callback for call {call_id}: {json.dumps(webhook_data)}"
|
||||
)
|
||||
|
||||
return provider_class.generate_error_response(
|
||||
"SYSTEM_UNAVAILABLE",
|
||||
"Our system is temporarily unavailable. Please try again later.",
|
||||
)
|
||||
else:
|
||||
# Unknown provider - return generic XML
|
||||
logger.info(
|
||||
f"[fallback] Received unknown provider callback: {json.dumps(webhook_data)} and request headers: {json.dumps(headers)}"
|
||||
)
|
||||
|
||||
return generic_hangup_response()
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ from api.constants import DEPLOYMENT_MODE
|
|||
from api.db import db_client
|
||||
from api.db.models import UserModel
|
||||
from api.db.workflow_template_client import WorkflowTemplateClient
|
||||
from api.enums import CallType
|
||||
from api.schemas.workflow import WorkflowRunResponseSchema
|
||||
from api.services.auth.depends import get_user
|
||||
from api.services.mps_service_key_client import mps_service_key_client
|
||||
|
|
@ -141,7 +142,7 @@ class CreateWorkflowRunResponse(BaseModel):
|
|||
|
||||
|
||||
class CreateWorkflowTemplateRequest(BaseModel):
|
||||
call_type: Literal["INBOUND", "OUTBOUND"]
|
||||
call_type: Literal[CallType.INBOUND.value, CallType.OUTBOUND.value]
|
||||
use_case: str
|
||||
activity_description: str
|
||||
|
||||
|
|
@ -289,7 +290,7 @@ async def create_workflow_from_template(
|
|||
# Call MPS API to generate workflow using the client
|
||||
if DEPLOYMENT_MODE == "oss":
|
||||
workflow_data = await mps_service_key_client.call_workflow_api(
|
||||
call_type=request.call_type,
|
||||
call_type=request.call_type.upper(),
|
||||
use_case=request.use_case,
|
||||
activity_description=request.activity_description,
|
||||
created_by=str(user.provider_id),
|
||||
|
|
@ -299,7 +300,7 @@ async def create_workflow_from_template(
|
|||
raise HTTPException(status_code=400, detail="No organization selected")
|
||||
|
||||
workflow_data = await mps_service_key_client.call_workflow_api(
|
||||
call_type=request.call_type,
|
||||
call_type=request.call_type.upper(),
|
||||
use_case=request.use_case,
|
||||
activity_description=request.activity_description,
|
||||
organization_id=user.selected_organization_id,
|
||||
|
|
@ -609,6 +610,7 @@ async def get_workflow_run(
|
|||
"definition_id": run.definition_id,
|
||||
"initial_context": run.initial_context,
|
||||
"gathered_context": run.gathered_context,
|
||||
"call_type": run.call_type,
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue