feat: handle cloudonix incoming calls

This commit is contained in:
Sabiha Khan 2026-01-16 17:46:44 +05:30
parent a172db8022
commit 7fd3bb765d
5 changed files with 186 additions and 59 deletions

View file

@ -278,6 +278,7 @@ async def _validate_inbound_request(
x_twilio_signature: str = None, x_twilio_signature: str = None,
x_vobiz_signature: str = None, x_vobiz_signature: str = None,
x_vobiz_timestamp: str = None, x_vobiz_timestamp: str = None,
x_cx_apikey: str = None,
) -> tuple[bool, TelephonyError, dict, object]: ) -> tuple[bool, TelephonyError, dict, object]:
""" """
Validate all aspects of inbound request. Validate all aspects of inbound request.
@ -309,9 +310,9 @@ async def _validate_inbound_request(
if not is_valid: if not is_valid:
return False, TelephonyError.PHONE_NUMBER_NOT_CONFIGURED, {}, None return False, TelephonyError.PHONE_NUMBER_NOT_CONFIGURED, {}, None
# Verify webhook signature if provided # Verify webhook signature/API key if provided
provider_instance = None provider_instance = None
if x_twilio_signature or x_vobiz_signature: if x_twilio_signature or x_vobiz_signature or x_cx_apikey:
backend_endpoint = await TunnelURLProvider.get_tunnel_url() backend_endpoint = await TunnelURLProvider.get_tunnel_url()
webhook_url = ( webhook_url = (
f"https://{backend_endpoint}/api/v1/telephony/inbound/{workflow_id}" f"https://{backend_endpoint}/api/v1/telephony/inbound/{workflow_id}"
@ -334,13 +335,18 @@ async def _validate_inbound_request(
x_vobiz_timestamp, x_vobiz_timestamp,
webhook_body, webhook_body,
) )
elif provider_class.PROVIDER_NAME == "cloudonix" and x_cx_apikey:
logger.info(f"Verifying Cloudonix API key for URL: {webhook_url}")
signature_valid = await provider_instance.verify_inbound_signature(
webhook_url, webhook_data, x_cx_apikey
)
else: else:
logger.warning( logger.warning(
f"No signature validation for provider {provider_class.PROVIDER_NAME}" f"No signature/API key validation for provider {provider_class.PROVIDER_NAME}"
) )
signature_valid = True signature_valid = True
logger.info(f"Signature validation result: {signature_valid}") logger.info(f"Signature/API key validation result: {signature_valid}")
if not signature_valid: if not signature_valid:
return ( return (
False, False,
@ -582,6 +588,7 @@ async def handle_twilio_status_callback(
x_webhook_signature: Optional[str] = Header(None), x_webhook_signature: Optional[str] = Header(None),
): ):
"""Handle Twilio-specific status callbacks.""" """Handle Twilio-specific status callbacks."""
set_current_run_id(workflow_run_id)
# Parse form data # Parse form data
form_data = await request.form() form_data = await request.form()
@ -725,6 +732,7 @@ async def handle_vonage_events(
Vonage sends all call events to a single endpoint. Vonage sends all call events to a single endpoint.
Events include: started, ringing, answered, complete, failed, etc. Events include: started, ringing, answered, complete, failed, etc.
""" """
set_current_run_id(workflow_run_id)
# Parse the event data # Parse the event data
event_data = await request.json() event_data = await request.json()
logger.info(f"[run {workflow_run_id}] Received Vonage event: {event_data}") logger.info(f"[run {workflow_run_id}] Received Vonage event: {event_data}")
@ -802,6 +810,7 @@ async def handle_vobiz_xml_webhook(
Vobiz uses Plivo-compatible XML format similar to Twilio's TwiML. Vobiz uses Plivo-compatible XML format similar to Twilio's TwiML.
""" """
set_current_run_id(workflow_run_id)
logger.info( logger.info(
f"[run {workflow_run_id}] Vobiz XML webhook called - " f"[run {workflow_run_id}] Vobiz XML webhook called - "
f"workflow_id={workflow_id}, user_id={user_id}, org_id={organization_id}" f"workflow_id={workflow_id}, user_id={user_id}, org_id={organization_id}"
@ -834,7 +843,8 @@ async def handle_vobiz_hangup_callback(
Vobiz sends callbacks to hangup_url when the call terminates. Vobiz sends callbacks to hangup_url when the call terminates.
This includes call duration, status, and billing information. This includes call duration, status, and billing information.
""" """
# TODO: Remove this debug logging after Vobiz team clarifies webhook authentication set_current_run_id(workflow_run_id)
# Logging all headers and body to understand what Vobiz actually sends # Logging all headers and body to understand what Vobiz actually sends
all_headers = dict(request.headers) all_headers = dict(request.headers)
logger.info( logger.info(
@ -954,7 +964,8 @@ async def handle_vobiz_ring_callback(
Vobiz can send callbacks to ring_url when the call starts ringing. Vobiz can send callbacks to ring_url when the call starts ringing.
This is optional and used for tracking ringing status. This is optional and used for tracking ringing status.
""" """
# TODO: Remove this debug logging after Vobiz team clarifies webhook authentication set_current_run_id(workflow_run_id)
# Logging all headers and body to understand what Vobiz actually sends # Logging all headers and body to understand what Vobiz actually sends
all_headers = dict(request.headers) all_headers = dict(request.headers)
logger.info( logger.info(
@ -1056,6 +1067,7 @@ async def handle_cloudonix_status_callback(
Cloudonix sends call status updates to the callback URL specified during call initiation. Cloudonix sends call status updates to the callback URL specified during call initiation.
""" """
set_current_run_id(workflow_run_id)
# Parse callback data - determine if JSON or form data # Parse callback data - determine if JSON or form data
content_type = request.headers.get("content-type", "") content_type = request.headers.get("content-type", "")
@ -1194,6 +1206,7 @@ async def handle_vobiz_hangup_callback_by_workflow(
return {"status": "ignored", "reason": "workflow_run_not_found"} return {"status": "ignored", "reason": "workflow_run_not_found"}
workflow_run_id = workflow_run_row[0] workflow_run_id = workflow_run_row[0]
set_current_run_id(workflow_run_id)
logger.info( logger.info(
f"[workflow {workflow_id}] Found workflow run {workflow_run_id} for call {call_uuid}" f"[workflow {workflow_id}] Found workflow run {workflow_run_id} for call {call_uuid}"
) )
@ -1243,12 +1256,14 @@ async def handle_inbound_telephony(
x_twilio_signature: Optional[str] = Header(None), x_twilio_signature: Optional[str] = Header(None),
x_vobiz_signature: Optional[str] = Header(None), x_vobiz_signature: Optional[str] = Header(None),
x_vobiz_timestamp: Optional[str] = Header(None), x_vobiz_timestamp: Optional[str] = Header(None),
x_cx_apikey: Optional[str] = Header(None),
): ):
"""Handle inbound telephony calls from any supported provider with common processing""" """Handle inbound telephony calls from any supported provider with common processing"""
logger.info(f"Inbound call received for workflow_id: {workflow_id}") logger.info(f"Inbound call received for workflow_id: {workflow_id}")
try: try:
webhook_data, data_source = await parse_webhook_request(request) webhook_data, data_source = await parse_webhook_request(request)
logger.info(f"Inbound call data with data source: {data_source} and data :{dict(webhook_data)}")
headers = dict(request.headers) headers = dict(request.headers)
# Detect provider and normalize data # Detect provider and normalize data
@ -1293,6 +1308,7 @@ async def handle_inbound_telephony(
x_twilio_signature, x_twilio_signature,
x_vobiz_signature, x_vobiz_signature,
x_vobiz_timestamp, x_vobiz_timestamp,
x_cx_apikey,
) )
if not is_valid: if not is_valid:

View file

@ -276,16 +276,18 @@ class TelephonyProvider(ABC):
""" """
pass pass
@staticmethod
@abstractmethod @abstractmethod
def generate_inbound_response(self, websocket_url: str) -> tuple: async def generate_inbound_response(websocket_url: str, workflow_run_id: int = None) -> tuple:
""" """
Generate the appropriate response for an inbound webhook. Generate the appropriate response for an inbound webhook.
Args: Args:
websocket_url: WebSocket URL for audio streaming websocket_url: WebSocket URL for audio streaming
workflow_run_id: Optional workflow run ID for tracking
Returns: Returns:
Tuple of (Response, media_type) - Response object and content type FastAPI Response object
""" """
pass pass

View file

@ -71,6 +71,7 @@ async def load_telephony_config(organization_id: int) -> Dict[str, Any]:
return { return {
"provider": "cloudonix", "provider": "cloudonix",
"bearer_token": config.value.get("bearer_token"), "bearer_token": config.value.get("bearer_token"),
"api_key": config.value.get("api_key"), # For x-cx-apikey validation
"domain_id": config.value.get("domain_id"), "domain_id": config.value.get("domain_id"),
"from_numbers": config.value.get("from_numbers", []), "from_numbers": config.value.get("from_numbers", []),
} }
@ -122,7 +123,8 @@ async def get_all_telephony_providers() -> List[Type[TelephonyProvider]]:
""" """
Get all available telephony provider classes for webhook detection. Get all available telephony provider classes for webhook detection.
Returns: Returns:
List of provider classes that can be used for webhook detection List of provider classes that can be used for webhook detection
""" """
return [TwilioProvider, VobizProvider, VonageProvider] return [CloudonixProvider, TwilioProvider, VobizProvider, VonageProvider]

View file

@ -429,60 +429,94 @@ class CloudonixProvider(TelephonyProvider):
) -> bool: ) -> bool:
""" """
Determine if this provider can handle the incoming webhook. Determine if this provider can handle the incoming webhook.
Cloudonix uses TwiML-compatible format, so look for Twilio-like identifiers
but also check for Cloudonix-specific headers or fields if they exist.
""" """
# Check for Cloudonix-specific headers # 1: Check User-Agent header
if headers.get("User-Agent", "").lower().startswith("cloudonix"): user_agent = headers.get("user-agent", "").lower()
if "cloudonix" in user_agent:
return True return True
# Check for session token (Cloudonix equivalent of CallSid) # 2: Check for Cloudonix-specific headers
if "token" in webhook_data or "session_token" in webhook_data: cloudonix_headers = ["x-cx-apikey", "x-cx-domain", "x-cx-session", "x-cx-source"]
if any(header in headers for header in cloudonix_headers):
return True return True
# If it looks like TwiML format but no other providers claimed it, # 3: Check data structure for Cloudonix-specific fields
# it could be Cloudonix (TwiML-compatible) if ("SessionData" in webhook_data and "Domain" in webhook_data and
if "CallSid" in webhook_data and "AccountSid" in webhook_data: webhook_data.get("Domain", "").endswith(".cloudonix.net")):
# Let Twilio provider handle this first, only handle if unclaimed return True
return False
# Check if AccountSid is a Cloudonix domain
account_sid = webhook_data.get("AccountSid", "")
if account_sid.endswith(".cloudonix.net"):
return True
return False return False
@staticmethod @staticmethod
def parse_inbound_webhook(webhook_data: Dict[str, Any]) -> NormalizedInboundData: def parse_inbound_webhook(webhook_data: Dict[str, Any]) -> NormalizedInboundData:
""" """
Parse Cloudonix-specific inbound webhook data into normalized format. Parse Cloudonix-specific inbound webhook data into normalized format.
Cloudonix is TwiML-compatible so the webhook format should be similar to Twilio. Cloudonix webhook structure includes:
- CallSid: Call id
- From: Caller number
- To: Called number
- AccountSid: Domain (e.g., "abc.cloudonix.net")
- SessionData: Contains additional call info including underlying provider details
""" """
session_data = webhook_data.get("SessionData", {})
token = session_data.get("token", "") if isinstance(session_data, dict) else ""
call_id = (webhook_data.get("Session") or
webhook_data.get("CallSid") or
token)
account_id = (webhook_data.get("Domain") or webhook_data.get("AccountSid", ""))
# Extract underlying provider information from SessionData if available
session_data = webhook_data.get("SessionData", {})
underlying_provider = None
if isinstance(session_data, dict):
profile = session_data.get("profile", {})
trunk_headers = profile.get("trunk-sip-headers", {})
if "Twilio-AccountSid" in trunk_headers:
underlying_provider = "twilio"
return NormalizedInboundData( return NormalizedInboundData(
provider=CloudonixProvider.PROVIDER_NAME, provider=CloudonixProvider.PROVIDER_NAME,
call_id=webhook_data.get("token") or webhook_data.get("CallSid", ""), call_id=call_id,
from_number=webhook_data.get("From", ""), from_number=webhook_data.get("From", ""),
to_number=webhook_data.get("To", ""), to_number=webhook_data.get("To", ""),
direction="inbound", # This is an inbound webhook direction=webhook_data.get("Direction", "inbound").lower(),
call_status=webhook_data.get("CallStatus", "ringing"), call_status=webhook_data.get("CallStatus", "in-progress"),
account_id=webhook_data.get("AccountSid") or webhook_data.get("domain_id"), account_id=account_id,
from_country=webhook_data.get("FromCountry"), from_country=webhook_data.get("FromCountry"),
to_country=webhook_data.get("ToCountry"), to_country=webhook_data.get("ToCountry"),
raw_data=webhook_data, raw_data={
**webhook_data,
"underlying_provider": underlying_provider,
},
) )
@staticmethod @staticmethod
def validate_account_id(config_data: dict, webhook_account_id: str) -> bool: def validate_account_id(config_data: dict, webhook_account_id: str) -> bool:
""" """
Validate that the account_id from webhook matches the Cloudonix configuration. Validate that the account_id from webhook matches the Cloudonix configuration.
For Cloudonix:
- webhook_account_id is the Domain field (e.g., "test1.cloudonix.net")
- config domain_id stores the same domain string
""" """
if not webhook_account_id: if not webhook_account_id:
return False return False
# Cloudonix uses domain_id as the account identifier # Get stored domain from config (stored under 'domain_id' key)
stored_domain_id = config_data.get("domain_id") stored_domain = config_data.get("domain_id")
if not stored_domain_id: if not stored_domain:
return False return False
return webhook_account_id == stored_domain_id return webhook_account_id == stored_domain
def normalize_phone_number(self, phone_number: str) -> str: def normalize_phone_number(self, phone_number: str) -> str:
""" """
@ -513,42 +547,91 @@ class CloudonixProvider(TelephonyProvider):
return clean_number return clean_number
async def verify_inbound_signature( async def verify_inbound_signature(
self, url: str, webhook_data: Dict[str, Any], signature: str self, url: str, webhook_data: Dict[str, Any], api_key: str
) -> bool: ) -> bool:
""" """
Verify the signature of an inbound Cloudonix webhook for security. Verify the API key of an inbound Cloudonix webhook for security.
Note: Cloudonix signature verification details need to be implemented Cloudonix uses x-cx-apikey header validation instead of signature verification.
based on their specific authentication method. For now, we'll log The API key from the webhook should match the bearer_token in our configuration.
and return True (similar to current webhook verification).
""" """
logger.info( if not api_key:
f"Cloudonix inbound signature verification not fully implemented. " logger.warning("No x-cx-apikey provided in Cloudonix webhook")
f"Webhook URL: {url}, Signature present: {bool(signature)}" return False
)
# The bearer_token in config is the same as x-cx-apikey header value
if not self.bearer_token:
logger.warning("No bearer_token configured for Cloudonix provider")
return False
# Compare the API keys
is_valid = api_key == self.bearer_token
if is_valid:
logger.info("Cloudonix x-cx-apikey validation successful")
else:
logger.warning(f"Cloudonix x-cx-apikey validation failed. Expected key ending with ...{self.bearer_token[-8:] if len(self.bearer_token) > 8 else 'SHORT_KEY'}")
return True #TODO: update this post clarification from cloudonix
# TODO: Implement actual Cloudonix signature verification
# This would depend on Cloudonix's specific signing method
return True
def generate_inbound_response(self, websocket_url: str) -> tuple: @staticmethod
async def generate_inbound_response(
websocket_url: str, workflow_run_id: int = None
) -> tuple:
""" """
Generate the appropriate TwiML response for an inbound Cloudonix webhook. Generate the appropriate CXML response for an inbound Cloudonix webhook.
Since Cloudonix is TwiML-compatible, we generate TwiML response. Returns CXML to connect to WebSocket, same format as outbound calls.
""" """
from fastapi import Response from fastapi import Response
# Generate TwiML response to connect to WebSocket # Generate CXML response (same format as outbound calls)
twiml = f"""<?xml version="1.0" encoding="UTF-8"?> cxml_content = f"""<?xml version="1.0" encoding="UTF-8"?>
<Response> <Response>
<Connect> <Connect>
<Stream url="{websocket_url}"></Stream> <Stream url="{websocket_url}"></Stream>
</Connect> </Connect>
<Pause length="40"/> <Pause length="40"/>
</Response>""" </Response>"""
logger.info(f"Cloudonix inbound CXML response content:")
logger.info(cxml_content)
response = Response(
content=cxml_content,
media_type="application/xml"
)
logger.info(f"Cloudonix inbound response object: {response}")
logger.info(f"Response headers: {response.headers}")
logger.info(f"Response media type: {response.media_type}")
return response
return Response(content=twiml, media_type="application/xml"), "application/xml" @staticmethod
def generate_validation_error_response(error_type) -> tuple:
"""
Generate Cloudonix-specific error response for validation failures.
Since Cloudonix is TwiML-compatible, we use the same XML format.
"""
from fastapi import Response
from api.errors.telephony_errors import TELEPHONY_ERROR_MESSAGES, TelephonyError
message = TELEPHONY_ERROR_MESSAGES.get(
error_type, TELEPHONY_ERROR_MESSAGES[TelephonyError.GENERAL_AUTH_FAILED]
)
twiml_content = f"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Say voice="alice">{message}</Say>
<Hangup/>
</Response>"""
return Response(content=twiml_content, media_type="application/xml")
@staticmethod @staticmethod
def generate_error_response(error_type: str, message: str) -> tuple: def generate_error_response(error_type: str, message: str) -> tuple:

View file

@ -297,9 +297,33 @@ class TwilioProvider(TelephonyProvider):
) -> bool: ) -> bool:
""" """
Determine if this provider can handle the incoming webhook. Determine if this provider can handle the incoming webhook.
Twilio webhooks contain CallSid field.
Twilio webhooks have specific characteristics:
- User-Agent: "TwilioProxy/1.1"
- Headers: "x-twilio-signature", "i-twilio-idempotency-token"
- Data: CallSid + AccountSid (AC prefix) + ApiVersion
- AccountSid format: starts with "AC" (not a domain)
""" """
return "CallSid" in webhook_data # 1: Check for Twilio-specific User-Agent
user_agent = headers.get("user-agent", "")
if "twilioproxy" in user_agent.lower() or user_agent.startswith("TwilioProxy"):
return True
# 2: Check for Twilio-specific headers
twilio_headers = ["x-twilio-signature", "i-twilio-idempotency-token", "x-home-region"]
if any(header in headers for header in twilio_headers):
return True
# 3: Check data structure - CallSid + AccountSid with AC prefix + ApiVersion
if ("CallSid" in webhook_data and
"AccountSid" in webhook_data and
"ApiVersion" in webhook_data):
# Ensure AccountSid looks like Twilio (starts with AC, not a domain)
account_sid = webhook_data.get("AccountSid", "")
if account_sid.startswith("AC") and not "." in account_sid:
return True
return False
@staticmethod @staticmethod
def parse_inbound_webhook(webhook_data: Dict[str, Any]) -> NormalizedInboundData: def parse_inbound_webhook(webhook_data: Dict[str, Any]) -> NormalizedInboundData: