dograh/api/routes/telephony.py
Abhishek Kumar 6d93be3ef6 fix: number pool initialization in multi telephony setup
If there are multiple telephony configurations, the form number should be initialized from the campaigns given telephonic configuration rather than the organization default telephonic configuration.
2026-05-08 14:48:53 +05:30

1024 lines
39 KiB
Python

"""
Telephony routes - handles all telephony-related endpoints.
Consolidated from split modules for easier maintenance.
"""
import json
import uuid
from typing import Optional
from fastapi import (
APIRouter,
Depends,
HTTPException,
Request,
WebSocket,
)
from loguru import logger
from pipecat.utils.run_context import set_current_run_id
from pydantic import BaseModel
from starlette.websockets import WebSocketDisconnect
from api.db import db_client
from api.db.models import UserModel
from api.enums import CallType, WorkflowRunState
from api.errors.telephony_errors import TelephonyError
from api.sdk_expose import sdk_expose
from api.services.auth.depends import get_user
from api.services.quota_service import check_dograh_quota, check_dograh_quota_by_user_id
from api.services.telephony.call_transfer_manager import get_call_transfer_manager
from api.services.telephony.factory import (
get_all_telephony_providers,
get_default_telephony_provider,
get_telephony_provider_by_id,
get_telephony_provider_for_run,
)
from api.services.telephony.transfer_event_protocol import (
TransferEvent,
TransferEventType,
)
from api.utils.common import get_backend_endpoints
from api.utils.telephony_helper import (
generic_hangup_response,
normalize_webhook_data,
numbers_match,
parse_webhook_request,
)
router = APIRouter(prefix="/telephony")
class InitiateCallRequest(BaseModel):
workflow_id: int
workflow_run_id: int | None = None
phone_number: str | None = None
# Optional explicit telephony config to use for the test call. If omitted,
# falls back to the user's per-user default (when set), then the org default.
telephony_configuration_id: int | None = None
# Optional caller-ID phone number to dial out from. Must belong to the
# resolved telephony configuration; otherwise the provider picks one.
from_phone_number_id: int | None = None
@router.post(
"/initiate-call",
**sdk_expose(
method="test_phone_call",
description="Place a test call from a workflow to a phone number.",
),
)
async def initiate_call(
request: InitiateCallRequest, user: UserModel = Depends(get_user)
):
"""Initiate a call using the configured telephony provider from web browser. This is
supposed to be a test call method for the draft version of the agent."""
user_configuration = await db_client.get_user_configurations(user.id)
# Resolve which telephony config to use: explicit request value, otherwise
# the org's default outbound config.
telephony_configuration_id = request.telephony_configuration_id
if telephony_configuration_id:
try:
provider = await get_telephony_provider_by_id(
telephony_configuration_id, user.selected_organization_id
)
except ValueError:
raise HTTPException(
status_code=400, detail="telephony_configuration_not_found"
)
else:
try:
provider = await get_default_telephony_provider(
user.selected_organization_id
)
except ValueError:
raise HTTPException(status_code=400, detail="telephony_not_configured")
default_cfg = await db_client.get_default_telephony_configuration(
user.selected_organization_id
)
telephony_configuration_id = default_cfg.id if default_cfg else None
# Validate provider is configured
if not provider.validate_config():
raise HTTPException(
status_code=400,
detail="telephony_not_configured",
)
# Check Dograh quota before initiating the call (apply per-workflow
# model_overrides so the keys we will actually use are the ones checked).
quota_result = await check_dograh_quota(user, workflow_id=request.workflow_id)
if not quota_result.has_quota:
raise HTTPException(status_code=402, detail=quota_result.error_message)
# Determine the workflow run mode based on provider type
workflow_run_mode = provider.PROVIDER_NAME
phone_number = request.phone_number or user_configuration.test_phone_number
if not phone_number:
raise HTTPException(
status_code=400,
detail="Phone number must be provided in request or set in user "
"configuration",
)
workflow_run_id = request.workflow_run_id
if not workflow_run_id:
# Fetch workflow to merge template context variables (e.g. caller_number,
# called_number set in workflow settings for testing pre-call data fetch)
workflow = await db_client.get_workflow(
request.workflow_id, organization_id=user.selected_organization_id
)
if not workflow:
raise HTTPException(status_code=404, detail="Workflow not found")
template_vars = workflow.template_context_variables or {}
numeric_suffix = int(str(uuid.uuid4()).replace("-", "")[:8], 16) % 100000000
workflow_run_name = f"WR-TEL-OUT-{numeric_suffix:08d}"
workflow_run = await db_client.create_workflow_run(
workflow_run_name,
request.workflow_id,
workflow_run_mode,
user_id=user.id,
call_type=CallType.OUTBOUND,
initial_context={
**template_vars,
"phone_number": phone_number,
"called_number": phone_number,
"provider": provider.PROVIDER_NAME,
"telephony_configuration_id": telephony_configuration_id,
},
use_draft=True,
)
workflow_run_id = workflow_run.id
else:
workflow_run = await db_client.get_workflow_run(workflow_run_id, user.id)
if not workflow_run:
raise HTTPException(status_code=400, detail="Workflow run not found")
workflow_run_name = workflow_run.name
# Construct webhook URL based on provider type
backend_endpoint, _ = await get_backend_endpoints()
webhook_endpoint = provider.WEBHOOK_ENDPOINT
webhook_url = (
f"{backend_endpoint}/api/v1/telephony/{webhook_endpoint}"
f"?workflow_id={request.workflow_id}"
f"&user_id={user.id}"
f"&workflow_run_id={workflow_run_id}"
f"&organization_id={user.selected_organization_id}"
)
keywords = {"workflow_id": request.workflow_id, "user_id": user.id}
# Resolve optional caller-ID. The config has already been validated against
# the user's organization, so filtering by config_id is sufficient for
# tenant isolation.
from_number: str | None = None
if request.from_phone_number_id is not None:
if telephony_configuration_id is None:
raise HTTPException(
status_code=400,
detail="from_phone_number_id_requires_telephony_configuration",
)
phone_row = await db_client.get_phone_number_for_config(
request.from_phone_number_id, telephony_configuration_id
)
if not phone_row or not phone_row.is_active:
raise HTTPException(status_code=400, detail="from_phone_number_not_found")
from_number = phone_row.address_normalized
# Initiate call via provider
result = await provider.initiate_call(
to_number=phone_number,
webhook_url=webhook_url,
workflow_run_id=workflow_run_id,
from_number=from_number,
**keywords,
)
# Store provider metadata and caller_number in workflow run context
gathered_context = {
"provider": provider.PROVIDER_NAME,
**(result.provider_metadata or {}),
}
# Merge caller_number into initial_context now that we know which number was used
updated_initial_context = {
**(workflow_run.initial_context or {}),
"called_number": phone_number,
"telephony_configuration_id": telephony_configuration_id,
}
if result.caller_number:
updated_initial_context["caller_number"] = result.caller_number
await db_client.update_workflow_run(
run_id=workflow_run_id,
gathered_context=gathered_context,
initial_context=updated_initial_context,
)
return {"message": f"Call initiated successfully with run name {workflow_run_name}"}
async def _verify_organization_phone_number(
phone_number: str,
organization_id: int,
telephony_configuration_id: int,
provider: str,
to_country: str = None,
from_country: str = None,
) -> Optional[int]:
"""Verify the called number is registered to the matched config and return
its ``telephony_phone_numbers.id``, or None when no row matches.
Primary path: deterministic E.164 / SIP lookup via the new phone-number table.
Legacy fallback: ``numbers_match()`` over the matched config's active numbers,
so non-E.164 rows that survived the migration still route correctly.
"""
try:
match = await db_client.find_active_phone_number_for_inbound(
organization_id, phone_number, provider, country_hint=to_country
)
if match and match.telephony_configuration_id == telephony_configuration_id:
logger.info(
f"Phone number {phone_number} matched row {match.id} for org "
f"{organization_id} / config {telephony_configuration_id}"
)
return match.id
# Legacy fallback: scan the matched config's active numbers and apply
# the country-aware fuzzy matcher (covers non-E.164 storage).
rows = await db_client.list_phone_numbers_for_config(telephony_configuration_id)
for row in rows:
if not row.is_active:
continue
if numbers_match(phone_number, row.address, to_country, from_country):
logger.info(
f"Phone number {phone_number} matched (fuzzy) row {row.id} "
f"for config {telephony_configuration_id}"
)
return row.id
logger.warning(
f"Phone number {phone_number} not registered to config "
f"{telephony_configuration_id} (org={organization_id}, "
f"to_country={to_country}, from_country={from_country})"
)
return None
except Exception as e:
logger.error(
f"Error verifying phone number {phone_number} for organization "
f"{organization_id} / config {telephony_configuration_id}: {e}"
)
return None
async def _detect_provider(webhook_data: dict, headers: dict):
"""Detect which telephony provider can handle this webhook"""
provider_classes = await get_all_telephony_providers()
for provider_class in provider_classes:
if provider_class.can_handle_webhook(webhook_data, headers):
return provider_class
logger.warning(f"No provider found for webhook data: {webhook_data.keys()}")
return None
async def _validate_inbound_request(
workflow_id: int,
provider_class,
normalized_data,
webhook_data: dict,
headers: dict,
raw_body: str = "",
) -> tuple[bool, TelephonyError, dict, object]:
"""
Validate all aspects of inbound request.
Returns: (is_valid, error_type, workflow_context, provider_instance)
"""
from api.services.telephony import registry as telephony_registry
workflow = await db_client.get_workflow(workflow_id)
if not workflow:
return False, TelephonyError.WORKFLOW_NOT_FOUND, {}, None
organization_id = workflow.organization_id
user_id = workflow.user_id
provider = normalized_data.provider
# Primary path: one combined query that resolves config + phone number
# together (joins configs and phone_numbers with provider, account_id,
# and called-number filters). Falls back to the two-step config-then-
# phone resolution to cover providers without account_id (ARI) and
# legacy non-E.164 stored addresses.
spec = telephony_registry.get_optional(provider_class.PROVIDER_NAME)
account_field = spec.account_id_credential_field if spec else ""
telephony_configuration_id: Optional[int] = None
phone_number_id: Optional[int] = None
if account_field and normalized_data.account_id:
match = await db_client.find_inbound_route_by_account(
provider=provider_class.PROVIDER_NAME,
account_id_field=account_field,
account_id=normalized_data.account_id,
to_number=normalized_data.to_number,
country_hint=normalized_data.to_country,
organization_id=organization_id,
)
if match:
cfg_row, phone_row = match
telephony_configuration_id = cfg_row.id
phone_number_id = phone_row.id
if telephony_configuration_id is None:
(
validation_result,
telephony_configuration_id,
) = await _resolve_inbound_telephony_config(
organization_id, provider_class, normalized_data.account_id
)
if validation_result != TelephonyError.VALID:
return False, validation_result, {}, None
phone_number_id = await _verify_organization_phone_number(
normalized_data.to_number,
organization_id,
telephony_configuration_id,
provider_class.PROVIDER_NAME,
normalized_data.to_country,
normalized_data.from_country,
)
if phone_number_id is None:
return False, TelephonyError.PHONE_NUMBER_NOT_CONFIGURED, {}, None
# Verify webhook signature using the matched config's credentials. The
# provider extracts its own signature/timestamp/nonce headers from the
# dict, so this dispatcher stays generic.
backend_endpoint, _ = await get_backend_endpoints()
webhook_url = f"{backend_endpoint}/api/v1/telephony/inbound/{workflow_id}"
provider_instance = await get_telephony_provider_by_id(
telephony_configuration_id, organization_id
)
signature_valid = await provider_instance.verify_inbound_signature(
webhook_url, webhook_data, headers, raw_body
)
logger.info(f"Signature validation for {provider}: {signature_valid}")
if not signature_valid:
return (
False,
TelephonyError.SIGNATURE_VALIDATION_FAILED,
{},
provider_instance,
)
# Return success with workflow context
workflow_context = {
"workflow": workflow,
"organization_id": organization_id,
"user_id": user_id,
"provider": provider,
"telephony_configuration_id": telephony_configuration_id,
"from_phone_number_id": phone_number_id,
}
return (True, "", workflow_context, provider_instance)
async def _create_inbound_workflow_run(
workflow_id: int,
user_id: int,
provider: str,
normalized_data,
telephony_configuration_id: int,
from_phone_number_id: Optional[int] = None,
) -> int:
"""Create workflow run for inbound call and return run ID"""
call_id = normalized_data.call_id
numeric_suffix = int(str(uuid.uuid4()).replace("-", "")[:8], 16) % 100000000
workflow_run_name = f"WR-TEL-IN-{numeric_suffix:08d}"
workflow_run = await db_client.create_workflow_run(
workflow_run_name,
workflow_id,
provider, # Use detected provider as mode
user_id=user_id,
call_type=CallType.INBOUND,
initial_context={
"caller_number": normalized_data.from_number,
"called_number": normalized_data.to_number,
"direction": "inbound",
"provider": provider,
"telephony_configuration_id": telephony_configuration_id,
},
gathered_context={
"call_id": call_id,
},
logs={
"inbound_webhook": {
"account_id": normalized_data.account_id,
"from_country": normalized_data.from_country,
"to_country": normalized_data.to_country,
"from_phone_number_id": from_phone_number_id,
"raw_webhook_data": normalized_data.raw_data,
},
},
)
logger.info(
f"Created inbound workflow run {workflow_run.id} for {provider} call {call_id}"
)
return workflow_run.id
async def _resolve_inbound_telephony_config(
organization_id: int, provider_class, account_id: str
) -> tuple[TelephonyError, Optional[int]]:
"""Find which of the org's telephony configs the inbound webhook came from.
Returns ``(VALID, config_id)`` on success or ``(error, None)`` otherwise.
Replaces the single-config check that assumed one provider per org.
"""
from api.services.telephony.factory import find_telephony_config_for_inbound
try:
candidates = await db_client.list_telephony_configurations_by_provider(
organization_id, provider_class.PROVIDER_NAME
)
if not candidates:
logger.warning(
f"No {provider_class.PROVIDER_NAME} configuration for org "
f"{organization_id}"
)
return TelephonyError.PROVIDER_MISMATCH, None
match = await find_telephony_config_for_inbound(
organization_id, provider_class.PROVIDER_NAME, account_id
)
if not match:
logger.warning(
f"Account validation failed for {provider_class.PROVIDER_NAME}: "
f"webhook account_id={account_id} (org {organization_id})"
)
return TelephonyError.ACCOUNT_VALIDATION_FAILED, None
config_id, _ = match
return TelephonyError.VALID, config_id
except Exception as e:
logger.error(f"Exception during account validation: {e}")
return TelephonyError.ACCOUNT_VALIDATION_FAILED, None
@router.websocket("/ws/ari")
async def websocket_ari_endpoint(websocket: WebSocket):
"""WebSocket endpoint for ARI chan_websocket external media.
Asterisk connects here via chan_websocket. Routing params are passed as
query params (appended by the v() dial string option in externalMedia).
"""
workflow_id = websocket.query_params.get("workflow_id")
user_id = websocket.query_params.get("user_id")
workflow_run_id = websocket.query_params.get("workflow_run_id")
if not workflow_id or not user_id or not workflow_run_id:
logger.error(
f"ARI WebSocket missing query params: "
f"workflow_id={workflow_id}, user_id={user_id}, workflow_run_id={workflow_run_id}"
)
await websocket.close(code=4400, reason="Missing required query params")
return
# Accept with "media" subprotocol — chan_websocket sends
# Sec-WebSocket-Protocol: media and requires it echoed back.
await websocket.accept(subprotocol="media")
await _handle_telephony_websocket(
websocket, int(workflow_id), int(user_id), int(workflow_run_id)
)
@router.websocket("/ws/{workflow_id}/{user_id}/{workflow_run_id}")
async def websocket_endpoint(
websocket: WebSocket, workflow_id: int, user_id: int, workflow_run_id: int
):
"""WebSocket endpoint for real-time call handling - routes to provider-specific handlers."""
await websocket.accept()
await _handle_telephony_websocket(websocket, workflow_id, user_id, workflow_run_id)
async def _handle_telephony_websocket(
websocket: WebSocket, workflow_id: int, user_id: int, workflow_run_id: int
):
"""Shared WebSocket handler logic (connection already accepted)."""
try:
# Set the run context
set_current_run_id(workflow_run_id)
# Get workflow run to determine provider type
workflow_run = await db_client.get_workflow_run(workflow_run_id)
if not workflow_run:
logger.error(f"Workflow run {workflow_run_id} not found")
await websocket.close(code=4404, reason="Workflow run not found")
return
# Get workflow for organization info
workflow = await db_client.get_workflow(workflow_id)
if not workflow:
logger.error(f"Workflow {workflow_id} not found")
await websocket.close(code=4404, reason="Workflow not found")
return
# Check workflow run state - only allow 'initialized' state
if workflow_run.state != WorkflowRunState.INITIALIZED.value:
logger.warning(
f"Workflow run {workflow_run_id} not in initialized state: {workflow_run.state}"
)
await websocket.close(
code=4409, reason="Workflow run not available for connection"
)
return
# Extract provider type from workflow run context
provider_type = None
logger.info(
f"Workflow run {workflow_run_id} gathered_context: {workflow_run.gathered_context}"
)
logger.info(f"Workflow run {workflow_run_id} mode: {workflow_run.mode}")
if workflow_run.initial_context:
provider_type = workflow_run.initial_context.get("provider")
logger.info(f"Extracted provider_type: {provider_type}")
if not provider_type:
logger.error(
f"No provider type found in workflow run {workflow_run_id}. "
f"gathered_context: {workflow_run.gathered_context}, mode: {workflow_run.mode}"
)
await websocket.close(code=4400, reason="Provider type not found")
return
logger.info(
f"WebSocket connected for {provider_type} provider, workflow_run {workflow_run_id}"
)
provider = await get_telephony_provider_for_run(
workflow_run, workflow.organization_id
)
# Verify the provider matches what was stored
if provider.PROVIDER_NAME != provider_type:
logger.error(
f"Provider mismatch: expected {provider_type}, got {provider.PROVIDER_NAME}"
)
await websocket.close(code=4400, reason="Provider mismatch")
return
# Set workflow run state to 'running' before starting the pipeline
await db_client.update_workflow_run(
run_id=workflow_run_id, state=WorkflowRunState.RUNNING.value
)
logger.info(
f"[run {workflow_run_id}] Set workflow run state to 'running' for {provider_type} provider"
)
# Delegate to provider-specific handler
await provider.handle_websocket(
websocket, workflow_id, user_id, workflow_run_id
)
except WebSocketDisconnect as e:
logger.info(f"WebSocket disconnected: code={e.code}, reason={e.reason}")
except Exception as e:
logger.error(f"Error in WebSocket connection: {e}")
try:
await websocket.close(1011, "Internal server error")
except RuntimeError:
# WebSocket already closed, ignore
pass
@router.post("/inbound/run")
async def handle_inbound_run(request: Request):
"""Workflow-agnostic inbound dispatcher.
All providers can point a single webhook at this endpoint instead of one
URL per workflow. The dispatcher resolves the org from the webhook's
account_id and the workflow from the called number's
``inbound_workflow_id``. This is what ``configure_inbound`` writes into
each provider's resource so per-workflow webhook bookkeeping disappears.
Provider-specific signature/timestamp headers are not enumerated here —
each provider's ``verify_inbound_signature`` reads its own headers from
the dict, so adding a new provider doesn't require changes to this route.
"""
from api.services.telephony import registry as telephony_registry
logger.info("Inbound /run dispatch received")
try:
webhook_data, raw_body = await parse_webhook_request(request)
headers = dict(request.headers)
provider_class = await _detect_provider(webhook_data, headers)
if not provider_class:
logger.error("Unable to detect provider for /inbound/run webhook")
return generic_hangup_response()
normalized_data = normalize_webhook_data(provider_class, webhook_data)
logger.info(
f"/inbound/run normalized data — provider={normalized_data.provider} "
f"to={normalized_data.to_number} from={normalized_data.from_number}"
)
if normalized_data.direction != "inbound":
logger.warning(
f"Non-inbound call on /inbound/run: {normalized_data.direction}"
)
return generic_hangup_response()
# 1. Resolve (config, phone_number) in a single SQL roundtrip that
# joins telephony_configurations and telephony_phone_numbers and
# filters on (provider, credentials[account_id_field], called number
# canonical address, is_active). The phone-number row's existence in
# the matched config simultaneously identifies the org — we never
# match a config from one org against a phone owned by another.
spec = telephony_registry.get_optional(provider_class.PROVIDER_NAME)
account_field = spec.account_id_credential_field if spec else ""
match = await db_client.find_inbound_route_by_account(
provider=provider_class.PROVIDER_NAME,
account_id_field=account_field,
account_id=normalized_data.account_id or "",
to_number=normalized_data.to_number,
country_hint=normalized_data.to_country,
)
if not match:
logger.warning(
f"/inbound/run: no inbound route matched "
f"provider={provider_class.PROVIDER_NAME} "
f"account_id={normalized_data.account_id} "
f"to={normalized_data.to_number}"
)
return provider_class.generate_validation_error_response(
TelephonyError.PHONE_NUMBER_NOT_CONFIGURED
)
config, phone_row = match
telephony_configuration_id = config.id
if not phone_row.inbound_workflow_id:
logger.warning(
f"/inbound/run: number {normalized_data.to_number} has no "
f"inbound_workflow_id assigned"
)
return provider_class.generate_validation_error_response(
TelephonyError.WORKFLOW_NOT_FOUND
)
workflow_id = phone_row.inbound_workflow_id
workflow = await db_client.get_workflow(
workflow_id, organization_id=config.organization_id
)
if not workflow:
logger.warning(
f"/inbound/run: workflow not found {workflow_id} for org {config.organization_id}"
)
return provider_class.generate_validation_error_response(
TelephonyError.WORKFLOW_NOT_FOUND
)
user_id = workflow.user_id
# 3. Verify webhook signature against the matched config's credentials.
backend_endpoint, wss_backend_endpoint = await get_backend_endpoints()
webhook_url = f"{backend_endpoint}/api/v1/telephony/inbound/run"
provider_instance = await get_telephony_provider_by_id(
telephony_configuration_id, config.organization_id
)
signature_valid = await provider_instance.verify_inbound_signature(
webhook_url, webhook_data, headers, raw_body
)
if not signature_valid:
logger.warning(
f"/inbound/run: signature validation failed for "
f"{provider_class.PROVIDER_NAME}"
)
return provider_class.generate_validation_error_response(
TelephonyError.SIGNATURE_VALIDATION_FAILED
)
# 4. Quota check (use the workflow's model_overrides if set).
quota_result = await check_dograh_quota_by_user_id(
user_id, workflow_id=workflow_id
)
if not quota_result.has_quota:
logger.warning(
f"User {user_id} has exceeded quota: {quota_result.error_message}"
)
return provider_class.generate_validation_error_response(
TelephonyError.QUOTA_EXCEEDED
)
# 5. Create workflow run + return provider-shaped response.
workflow_run_id = await _create_inbound_workflow_run(
workflow_id,
user_id,
provider_class.PROVIDER_NAME,
normalized_data,
telephony_configuration_id=telephony_configuration_id,
from_phone_number_id=phone_row.id,
)
websocket_url = (
f"{wss_backend_endpoint}/api/v1/telephony/ws/"
f"{workflow_id}/{user_id}/{workflow_run_id}"
)
return await provider_instance.start_inbound_stream(
websocket_url=websocket_url,
workflow_run_id=workflow_run_id,
normalized_data=normalized_data,
backend_endpoint=backend_endpoint,
)
except ValueError as e:
logger.error(f"/inbound/run request parsing error: {e}")
return generic_hangup_response()
except Exception as e:
logger.error(f"/inbound/run unexpected error: {e}")
return generic_hangup_response()
@router.post("/inbound/fallback")
async def handle_inbound_fallback(request: Request):
"""Fallback endpoint that returns audio message when calls cannot be processed."""
webhook_data, _ = await parse_webhook_request(request)
headers = dict(request.headers)
# Detect provider
provider_class = await _detect_provider(webhook_data, headers)
if provider_class:
# Use provider-specific error response
call_id = (
webhook_data.get("CallSid")
or webhook_data.get("CallUUID")
or webhook_data.get("call_uuid")
)
logger.info(
f"[fallback] Received {provider_class.PROVIDER_NAME} callback for call {call_id}: {json.dumps(webhook_data)}"
)
return provider_class.generate_error_response(
"SYSTEM_UNAVAILABLE",
"Our system is temporarily unavailable. Please try again later.",
)
else:
# Unknown provider - return generic XML
logger.info(
f"[fallback] Received unknown provider callback: {json.dumps(webhook_data)} and request headers: {json.dumps(headers)}"
)
return generic_hangup_response()
@router.post("/inbound/{workflow_id}", deprecated=True)
async def handle_inbound_telephony(
workflow_id: int,
request: Request,
):
"""[LEGACY] Per-workflow inbound webhook.
Superseded by ``POST /inbound/run``, which resolves the workflow from
the called number's ``inbound_workflow_id`` and lets a single webhook
URL serve every workflow in the org. New integrations should point
their provider at ``/inbound/run``; this route is kept only for
existing provider configurations that still encode ``workflow_id``
in the URL.
"""
logger.info(
f"[legacy /inbound/{{workflow_id}}] Inbound call received for workflow_id: {workflow_id}"
)
try:
webhook_data, raw_body = await parse_webhook_request(request)
logger.info(f"Inbound call data: {dict(webhook_data)}")
headers = dict(request.headers)
# Detect provider and normalize data
provider_class = await _detect_provider(webhook_data, headers)
if not provider_class:
logger.error("Unable to detect provider for webhook")
return generic_hangup_response()
normalized_data = normalize_webhook_data(provider_class, webhook_data)
logger.info(f"Inbound call - Provider: {normalized_data.provider}")
logger.info(f"Normalized data: {normalized_data}")
# Validate inbound direction
if normalized_data.direction != "inbound":
logger.warning(f"Non-inbound call received: {normalized_data.direction}")
return generic_hangup_response()
(
is_valid,
error_type,
workflow_context,
provider_instance,
) = await _validate_inbound_request(
workflow_id,
provider_class,
normalized_data,
webhook_data,
headers,
raw_body,
)
if not is_valid:
logger.error(f"Request validation failed: {error_type}")
return provider_class.generate_validation_error_response(error_type)
# Check quota before processing (apply per-workflow model_overrides).
user_id = workflow_context["user_id"]
quota_result = await check_dograh_quota_by_user_id(
user_id, workflow_id=workflow_id
)
if not quota_result.has_quota:
logger.warning(
f"User {user_id} has exceeded quota for inbound calls: {quota_result.error_message}"
)
return provider_class.generate_validation_error_response(
TelephonyError.QUOTA_EXCEEDED
)
# Create workflow run
workflow_run_id = await _create_inbound_workflow_run(
workflow_id,
workflow_context["user_id"],
workflow_context["provider"],
normalized_data,
telephony_configuration_id=workflow_context["telephony_configuration_id"],
from_phone_number_id=workflow_context.get("from_phone_number_id"),
)
# Generate response URLs
backend_endpoint, wss_backend_endpoint = await get_backend_endpoints()
websocket_url = f"{wss_backend_endpoint}/api/v1/telephony/ws/{workflow_id}/{workflow_context['user_id']}/{workflow_run_id}"
response = await provider_instance.start_inbound_stream(
websocket_url=websocket_url,
workflow_run_id=workflow_run_id,
normalized_data=normalized_data,
backend_endpoint=backend_endpoint,
)
logger.info(
f"Generated {normalized_data.provider} response for call {normalized_data.call_id}"
)
return response
except ValueError as e:
logger.error(f"Request parsing error: {e}")
return generic_hangup_response()
except Exception as e:
logger.error(f"Error processing inbound call: {e}")
return generic_hangup_response()
@router.post("/transfer-result/{transfer_id}")
async def complete_transfer_function_call(transfer_id: str, request: Request):
"""Webhook endpoint to complete the function call with transfer result.
Called by Twilio's StatusCallback when the transfer call status changes.
"""
form_data = await request.form()
data = dict(form_data)
call_status = data.get("CallStatus", "")
call_sid = data.get("CallSid", "")
logger.info(
f"Transfer result(call status) webhook: {transfer_id} status={call_status}"
)
# Get transfer context from Redis for additional information
call_transfer_manager = await get_call_transfer_manager()
transfer_context = await call_transfer_manager.get_transfer_context(transfer_id)
original_call_sid = transfer_context.original_call_sid if transfer_context else None
conference_name = transfer_context.conference_name if transfer_context else None
# Determine the result based on call status with user-friendly messaging
if call_status in ("in-progress", "answered"):
result = {
"status": "success",
"message": "Great! The destination number answered. Let me transfer you now.",
"action": "destination_answered",
"conference_id": conference_name,
"transfer_call_sid": call_sid, # The outbound transfer call SID
"original_call_sid": original_call_sid, # The original caller's SID
"end_call": False, # Continue with transfer
}
elif call_status == "no-answer":
result = {
"status": "transfer_failed",
"reason": "no_answer",
"message": "The transfer call was not answered. The person may be busy or unavailable right now.",
"action": "transfer_failed",
"call_sid": call_sid,
"end_call": True,
}
elif call_status == "busy":
result = {
"status": "transfer_failed",
"reason": "busy",
"message": "The transfer call encountered a busy signal. The person is likely on another call.",
"action": "transfer_failed",
"call_sid": call_sid,
"end_call": True,
}
elif call_status == "failed":
result = {
"status": "transfer_failed",
"reason": "call_failed",
"message": "The transfer call failed to connect. There may be a network issue or the number is unavailable.",
"action": "transfer_failed",
"call_sid": call_sid,
"end_call": True,
}
else:
# Intermediate status (ringing, in-progress, etc.), don't complete yet
logger.info(
f"Received intermediate status {call_status}, waiting for final status"
)
return {"status": "pending"}
# Complete the function call with Redis event publishing
try:
# Determine event type based on result status
if result["status"] == "success":
event_type = TransferEventType.DESTINATION_ANSWERED
else:
event_type = TransferEventType.TRANSFER_FAILED
transfer_event = TransferEvent(
type=event_type,
transfer_id=transfer_id,
original_call_sid=original_call_sid or "",
transfer_call_sid=call_sid,
conference_name=conference_name,
message=result.get("message", ""),
status=result["status"],
action=result.get("action", ""),
reason=result.get("reason"),
)
# Publish the event via Redis
await call_transfer_manager.publish_transfer_event(transfer_event)
logger.info(
f"Published {event_type} event for {transfer_id} with result: {result['status']}"
)
except Exception as e:
logger.error(f"Error completing transfer {transfer_id}: {e}")
return {"status": "completed", "result": result}
# Mount per-provider routers (webhook, status callbacks, answer URLs).
#
# Each provider's routes live at ``providers/<name>/routes.py`` and expose
# a module-level ``router``. We discover them through the registry rather
# than pre-importing them from each provider's __init__.py so that the
# (heavy) route module — which transitively depends on status_processor,
# campaign helpers, etc. — is only loaded when the HTTP layer is actually
# being wired up, not when someone merely asks for a TelephonyProvider
# class. This is what keeps the package init free of cycles.
def _mount_provider_routers() -> None:
import importlib
from api.services.telephony import registry as _telephony_registry
for spec in _telephony_registry.all_specs():
try:
module = importlib.import_module(
f"api.services.telephony.providers.{spec.name}.routes"
)
except ModuleNotFoundError:
# Provider has no routes (e.g. ARI, which only has a WebSocket).
continue
provider_router = getattr(module, "router", None)
if provider_router is not None:
router.include_router(provider_router)
_mount_provider_routers()