feat: handle cloudonix incoming calls (#121)

This commit is contained in:
Sabiha Khan 2026-01-17 14:36:51 +05:30 committed by GitHub
parent cac25879bf
commit e2fa4bbb98
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 186 additions and 59 deletions

View file

@ -276,16 +276,18 @@ class TelephonyProvider(ABC):
"""
pass
@staticmethod
@abstractmethod
def generate_inbound_response(self, websocket_url: str) -> tuple:
async def generate_inbound_response(websocket_url: str, workflow_run_id: int = None) -> tuple:
"""
Generate the appropriate response for an inbound webhook.
Args:
websocket_url: WebSocket URL for audio streaming
workflow_run_id: Optional workflow run ID for tracking
Returns:
Tuple of (Response, media_type) - Response object and content type
FastAPI Response object
"""
pass

View file

@ -71,6 +71,7 @@ async def load_telephony_config(organization_id: int) -> Dict[str, Any]:
return {
"provider": "cloudonix",
"bearer_token": config.value.get("bearer_token"),
"api_key": config.value.get("api_key"), # For x-cx-apikey validation
"domain_id": config.value.get("domain_id"),
"from_numbers": config.value.get("from_numbers", []),
}
@ -122,7 +123,8 @@ async def get_all_telephony_providers() -> List[Type[TelephonyProvider]]:
"""
Get all available telephony provider classes for webhook detection.
Returns:
List of provider classes that can be used for webhook detection
"""
return [TwilioProvider, VobizProvider, VonageProvider]
return [CloudonixProvider, TwilioProvider, VobizProvider, VonageProvider]

View file

@ -429,60 +429,94 @@ class CloudonixProvider(TelephonyProvider):
) -> bool:
"""
Determine if this provider can handle the incoming webhook.
Cloudonix uses TwiML-compatible format, so look for Twilio-like identifiers
but also check for Cloudonix-specific headers or fields if they exist.
"""
# Check for Cloudonix-specific headers
if headers.get("User-Agent", "").lower().startswith("cloudonix"):
# 1: Check User-Agent header
user_agent = headers.get("user-agent", "").lower()
if "cloudonix" in user_agent:
return True
# Check for session token (Cloudonix equivalent of CallSid)
if "token" in webhook_data or "session_token" in webhook_data:
# 2: Check for Cloudonix-specific headers
cloudonix_headers = ["x-cx-apikey", "x-cx-domain", "x-cx-session", "x-cx-source"]
if any(header in headers for header in cloudonix_headers):
return True
# If it looks like TwiML format but no other providers claimed it,
# it could be Cloudonix (TwiML-compatible)
if "CallSid" in webhook_data and "AccountSid" in webhook_data:
# Let Twilio provider handle this first, only handle if unclaimed
return False
# 3: Check data structure for Cloudonix-specific fields
if ("SessionData" in webhook_data and "Domain" in webhook_data and
webhook_data.get("Domain", "").endswith(".cloudonix.net")):
return True
# Check if AccountSid is a Cloudonix domain
account_sid = webhook_data.get("AccountSid", "")
if account_sid.endswith(".cloudonix.net"):
return True
return False
@staticmethod
def parse_inbound_webhook(webhook_data: Dict[str, Any]) -> NormalizedInboundData:
"""
Parse Cloudonix-specific inbound webhook data into normalized format.
Cloudonix is TwiML-compatible so the webhook format should be similar to Twilio.
Cloudonix webhook structure includes:
- CallSid: Call id
- From: Caller number
- To: Called number
- AccountSid: Domain (e.g., "abc.cloudonix.net")
- SessionData: Contains additional call info including underlying provider details
"""
session_data = webhook_data.get("SessionData", {})
token = session_data.get("token", "") if isinstance(session_data, dict) else ""
call_id = (webhook_data.get("Session") or
webhook_data.get("CallSid") or
token)
account_id = (webhook_data.get("Domain") or webhook_data.get("AccountSid", ""))
# Extract underlying provider information from SessionData if available
session_data = webhook_data.get("SessionData", {})
underlying_provider = None
if isinstance(session_data, dict):
profile = session_data.get("profile", {})
trunk_headers = profile.get("trunk-sip-headers", {})
if "Twilio-AccountSid" in trunk_headers:
underlying_provider = "twilio"
return NormalizedInboundData(
provider=CloudonixProvider.PROVIDER_NAME,
call_id=webhook_data.get("token") or webhook_data.get("CallSid", ""),
call_id=call_id,
from_number=webhook_data.get("From", ""),
to_number=webhook_data.get("To", ""),
direction="inbound", # This is an inbound webhook
call_status=webhook_data.get("CallStatus", "ringing"),
account_id=webhook_data.get("AccountSid") or webhook_data.get("domain_id"),
direction=webhook_data.get("Direction", "inbound").lower(),
call_status=webhook_data.get("CallStatus", "in-progress"),
account_id=account_id,
from_country=webhook_data.get("FromCountry"),
to_country=webhook_data.get("ToCountry"),
raw_data=webhook_data,
to_country=webhook_data.get("ToCountry"),
raw_data={
**webhook_data,
"underlying_provider": underlying_provider,
},
)
@staticmethod
def validate_account_id(config_data: dict, webhook_account_id: str) -> bool:
"""
Validate that the account_id from webhook matches the Cloudonix configuration.
For Cloudonix:
- webhook_account_id is the Domain field (e.g., "test1.cloudonix.net")
- config domain_id stores the same domain string
"""
if not webhook_account_id:
return False
# Cloudonix uses domain_id as the account identifier
stored_domain_id = config_data.get("domain_id")
if not stored_domain_id:
# Get stored domain from config (stored under 'domain_id' key)
stored_domain = config_data.get("domain_id")
if not stored_domain:
return False
return webhook_account_id == stored_domain_id
return webhook_account_id == stored_domain
def normalize_phone_number(self, phone_number: str) -> str:
"""
@ -513,42 +547,91 @@ class CloudonixProvider(TelephonyProvider):
return clean_number
async def verify_inbound_signature(
self, url: str, webhook_data: Dict[str, Any], signature: str
self, url: str, webhook_data: Dict[str, Any], api_key: str
) -> bool:
"""
Verify the signature of an inbound Cloudonix webhook for security.
Note: Cloudonix signature verification details need to be implemented
based on their specific authentication method. For now, we'll log
and return True (similar to current webhook verification).
Verify the API key of an inbound Cloudonix webhook for security.
Cloudonix uses x-cx-apikey header validation instead of signature verification.
The API key from the webhook should match the bearer_token in our configuration.
"""
logger.info(
f"Cloudonix inbound signature verification not fully implemented. "
f"Webhook URL: {url}, Signature present: {bool(signature)}"
)
if not api_key:
logger.warning("No x-cx-apikey provided in Cloudonix webhook")
return False
# The bearer_token in config is the same as x-cx-apikey header value
if not self.bearer_token:
logger.warning("No bearer_token configured for Cloudonix provider")
return False
# Compare the API keys
is_valid = api_key == self.bearer_token
if is_valid:
logger.info("Cloudonix x-cx-apikey validation successful")
else:
logger.warning(f"Cloudonix x-cx-apikey validation failed. Expected key ending with ...{self.bearer_token[-8:] if len(self.bearer_token) > 8 else 'SHORT_KEY'}")
return True #TODO: update this post clarification from cloudonix
# TODO: Implement actual Cloudonix signature verification
# This would depend on Cloudonix's specific signing method
return True
def generate_inbound_response(self, websocket_url: str) -> tuple:
@staticmethod
async def generate_inbound_response(
websocket_url: str, workflow_run_id: int = None
) -> tuple:
"""
Generate the appropriate TwiML response for an inbound Cloudonix webhook.
Since Cloudonix is TwiML-compatible, we generate TwiML response.
Generate the appropriate CXML response for an inbound Cloudonix webhook.
Returns CXML to connect to WebSocket, same format as outbound calls.
"""
from fastapi import Response
# Generate TwiML response to connect to WebSocket
twiml = f"""<?xml version="1.0" encoding="UTF-8"?>
# Generate CXML response (same format as outbound calls)
cxml_content = f"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Connect>
<Stream url="{websocket_url}"></Stream>
</Connect>
<Pause length="40"/>
</Response>"""
logger.info(f"Cloudonix inbound CXML response content:")
logger.info(cxml_content)
response = Response(
content=cxml_content,
media_type="application/xml"
)
logger.info(f"Cloudonix inbound response object: {response}")
logger.info(f"Response headers: {response.headers}")
logger.info(f"Response media type: {response.media_type}")
return response
return Response(content=twiml, media_type="application/xml"), "application/xml"
@staticmethod
def generate_validation_error_response(error_type) -> tuple:
"""
Generate Cloudonix-specific error response for validation failures.
Since Cloudonix is TwiML-compatible, we use the same XML format.
"""
from fastapi import Response
from api.errors.telephony_errors import TELEPHONY_ERROR_MESSAGES, TelephonyError
message = TELEPHONY_ERROR_MESSAGES.get(
error_type, TELEPHONY_ERROR_MESSAGES[TelephonyError.GENERAL_AUTH_FAILED]
)
twiml_content = f"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Say voice="alice">{message}</Say>
<Hangup/>
</Response>"""
return Response(content=twiml_content, media_type="application/xml")
@staticmethod
def generate_error_response(error_type: str, message: str) -> tuple:

View file

@ -297,9 +297,33 @@ class TwilioProvider(TelephonyProvider):
) -> bool:
"""
Determine if this provider can handle the incoming webhook.
Twilio webhooks contain CallSid field.
Twilio webhooks have specific characteristics:
- User-Agent: "TwilioProxy/1.1"
- Headers: "x-twilio-signature", "i-twilio-idempotency-token"
- Data: CallSid + AccountSid (AC prefix) + ApiVersion
- AccountSid format: starts with "AC" (not a domain)
"""
return "CallSid" in webhook_data
# 1: Check for Twilio-specific User-Agent
user_agent = headers.get("user-agent", "")
if "twilioproxy" in user_agent.lower() or user_agent.startswith("TwilioProxy"):
return True
# 2: Check for Twilio-specific headers
twilio_headers = ["x-twilio-signature", "i-twilio-idempotency-token", "x-home-region"]
if any(header in headers for header in twilio_headers):
return True
# 3: Check data structure - CallSid + AccountSid with AC prefix + ApiVersion
if ("CallSid" in webhook_data and
"AccountSid" in webhook_data and
"ApiVersion" in webhook_data):
# Ensure AccountSid looks like Twilio (starts with AC, not a domain)
account_sid = webhook_data.get("AccountSid", "")
if account_sid.startswith("AC") and not "." in account_sid:
return True
return False
@staticmethod
def parse_inbound_webhook(webhook_data: Dict[str, Any]) -> NormalizedInboundData: