From 7fd3bb765dececcce350d44766ad9e4ffbab2b71 Mon Sep 17 00:00:00 2001 From: Sabiha Khan Date: Fri, 16 Jan 2026 17:46:44 +0530 Subject: [PATCH] feat: handle cloudonix incoming calls --- api/routes/telephony.py | 28 ++- api/services/telephony/base.py | 6 +- api/services/telephony/factory.py | 4 +- .../telephony/providers/cloudonix_provider.py | 179 +++++++++++++----- .../telephony/providers/twilio_provider.py | 28 ++- 5 files changed, 186 insertions(+), 59 deletions(-) diff --git a/api/routes/telephony.py b/api/routes/telephony.py index 92c71fc..15f676b 100644 --- a/api/routes/telephony.py +++ b/api/routes/telephony.py @@ -278,6 +278,7 @@ async def _validate_inbound_request( x_twilio_signature: str = None, x_vobiz_signature: str = None, x_vobiz_timestamp: str = None, + x_cx_apikey: str = None, ) -> tuple[bool, TelephonyError, dict, object]: """ Validate all aspects of inbound request. @@ -309,9 +310,9 @@ async def _validate_inbound_request( if not is_valid: return False, TelephonyError.PHONE_NUMBER_NOT_CONFIGURED, {}, None - # Verify webhook signature if provided + # Verify webhook signature/API key if provided provider_instance = None - if x_twilio_signature or x_vobiz_signature: + if x_twilio_signature or x_vobiz_signature or x_cx_apikey: backend_endpoint = await TunnelURLProvider.get_tunnel_url() webhook_url = ( f"https://{backend_endpoint}/api/v1/telephony/inbound/{workflow_id}" @@ -334,13 +335,18 @@ async def _validate_inbound_request( x_vobiz_timestamp, webhook_body, ) + elif provider_class.PROVIDER_NAME == "cloudonix" and x_cx_apikey: + logger.info(f"Verifying Cloudonix API key for URL: {webhook_url}") + signature_valid = await provider_instance.verify_inbound_signature( + webhook_url, webhook_data, x_cx_apikey + ) else: logger.warning( - f"No signature validation for provider {provider_class.PROVIDER_NAME}" + f"No signature/API key validation for provider {provider_class.PROVIDER_NAME}" ) signature_valid = True - logger.info(f"Signature validation result: {signature_valid}") + logger.info(f"Signature/API key validation result: {signature_valid}") if not signature_valid: return ( False, @@ -582,6 +588,7 @@ async def handle_twilio_status_callback( x_webhook_signature: Optional[str] = Header(None), ): """Handle Twilio-specific status callbacks.""" + set_current_run_id(workflow_run_id) # Parse form data form_data = await request.form() @@ -725,6 +732,7 @@ async def handle_vonage_events( Vonage sends all call events to a single endpoint. Events include: started, ringing, answered, complete, failed, etc. """ + set_current_run_id(workflow_run_id) # Parse the event data event_data = await request.json() logger.info(f"[run {workflow_run_id}] Received Vonage event: {event_data}") @@ -802,6 +810,7 @@ async def handle_vobiz_xml_webhook( Vobiz uses Plivo-compatible XML format similar to Twilio's TwiML. """ + set_current_run_id(workflow_run_id) logger.info( f"[run {workflow_run_id}] Vobiz XML webhook called - " f"workflow_id={workflow_id}, user_id={user_id}, org_id={organization_id}" @@ -834,7 +843,8 @@ async def handle_vobiz_hangup_callback( Vobiz sends callbacks to hangup_url when the call terminates. This includes call duration, status, and billing information. """ - # TODO: Remove this debug logging after Vobiz team clarifies webhook authentication + set_current_run_id(workflow_run_id) + # Logging all headers and body to understand what Vobiz actually sends all_headers = dict(request.headers) logger.info( @@ -954,7 +964,8 @@ async def handle_vobiz_ring_callback( Vobiz can send callbacks to ring_url when the call starts ringing. This is optional and used for tracking ringing status. """ - # TODO: Remove this debug logging after Vobiz team clarifies webhook authentication + set_current_run_id(workflow_run_id) + # Logging all headers and body to understand what Vobiz actually sends all_headers = dict(request.headers) logger.info( @@ -1056,6 +1067,7 @@ async def handle_cloudonix_status_callback( Cloudonix sends call status updates to the callback URL specified during call initiation. """ + set_current_run_id(workflow_run_id) # Parse callback data - determine if JSON or form data content_type = request.headers.get("content-type", "") @@ -1194,6 +1206,7 @@ async def handle_vobiz_hangup_callback_by_workflow( return {"status": "ignored", "reason": "workflow_run_not_found"} workflow_run_id = workflow_run_row[0] + set_current_run_id(workflow_run_id) logger.info( f"[workflow {workflow_id}] Found workflow run {workflow_run_id} for call {call_uuid}" ) @@ -1243,12 +1256,14 @@ async def handle_inbound_telephony( x_twilio_signature: Optional[str] = Header(None), x_vobiz_signature: Optional[str] = Header(None), x_vobiz_timestamp: Optional[str] = Header(None), + x_cx_apikey: Optional[str] = Header(None), ): """Handle inbound telephony calls from any supported provider with common processing""" logger.info(f"Inbound call received for workflow_id: {workflow_id}") try: webhook_data, data_source = await parse_webhook_request(request) + logger.info(f"Inbound call data with data source: {data_source} and data :{dict(webhook_data)}") headers = dict(request.headers) # Detect provider and normalize data @@ -1293,6 +1308,7 @@ async def handle_inbound_telephony( x_twilio_signature, x_vobiz_signature, x_vobiz_timestamp, + x_cx_apikey, ) if not is_valid: diff --git a/api/services/telephony/base.py b/api/services/telephony/base.py index 8e9c32c..be2a78d 100644 --- a/api/services/telephony/base.py +++ b/api/services/telephony/base.py @@ -276,16 +276,18 @@ class TelephonyProvider(ABC): """ pass + @staticmethod @abstractmethod - def generate_inbound_response(self, websocket_url: str) -> tuple: + async def generate_inbound_response(websocket_url: str, workflow_run_id: int = None) -> tuple: """ Generate the appropriate response for an inbound webhook. Args: websocket_url: WebSocket URL for audio streaming + workflow_run_id: Optional workflow run ID for tracking Returns: - Tuple of (Response, media_type) - Response object and content type + FastAPI Response object """ pass diff --git a/api/services/telephony/factory.py b/api/services/telephony/factory.py index ce3abaf..a79ae1c 100644 --- a/api/services/telephony/factory.py +++ b/api/services/telephony/factory.py @@ -71,6 +71,7 @@ async def load_telephony_config(organization_id: int) -> Dict[str, Any]: return { "provider": "cloudonix", "bearer_token": config.value.get("bearer_token"), + "api_key": config.value.get("api_key"), # For x-cx-apikey validation "domain_id": config.value.get("domain_id"), "from_numbers": config.value.get("from_numbers", []), } @@ -122,7 +123,8 @@ async def get_all_telephony_providers() -> List[Type[TelephonyProvider]]: """ Get all available telephony provider classes for webhook detection. + Returns: List of provider classes that can be used for webhook detection """ - return [TwilioProvider, VobizProvider, VonageProvider] + return [CloudonixProvider, TwilioProvider, VobizProvider, VonageProvider] diff --git a/api/services/telephony/providers/cloudonix_provider.py b/api/services/telephony/providers/cloudonix_provider.py index bbcc7b6..2951b70 100644 --- a/api/services/telephony/providers/cloudonix_provider.py +++ b/api/services/telephony/providers/cloudonix_provider.py @@ -429,60 +429,94 @@ class CloudonixProvider(TelephonyProvider): ) -> bool: """ Determine if this provider can handle the incoming webhook. - - Cloudonix uses TwiML-compatible format, so look for Twilio-like identifiers - but also check for Cloudonix-specific headers or fields if they exist. """ - # Check for Cloudonix-specific headers - if headers.get("User-Agent", "").lower().startswith("cloudonix"): + # 1: Check User-Agent header + user_agent = headers.get("user-agent", "").lower() + if "cloudonix" in user_agent: return True - - # Check for session token (Cloudonix equivalent of CallSid) - if "token" in webhook_data or "session_token" in webhook_data: + + # 2: Check for Cloudonix-specific headers + cloudonix_headers = ["x-cx-apikey", "x-cx-domain", "x-cx-session", "x-cx-source"] + if any(header in headers for header in cloudonix_headers): return True - - # If it looks like TwiML format but no other providers claimed it, - # it could be Cloudonix (TwiML-compatible) - if "CallSid" in webhook_data and "AccountSid" in webhook_data: - # Let Twilio provider handle this first, only handle if unclaimed - return False - + + # 3: Check data structure for Cloudonix-specific fields + if ("SessionData" in webhook_data and "Domain" in webhook_data and + webhook_data.get("Domain", "").endswith(".cloudonix.net")): + return True + + # Check if AccountSid is a Cloudonix domain + account_sid = webhook_data.get("AccountSid", "") + if account_sid.endswith(".cloudonix.net"): + return True + return False @staticmethod def parse_inbound_webhook(webhook_data: Dict[str, Any]) -> NormalizedInboundData: """ Parse Cloudonix-specific inbound webhook data into normalized format. - - Cloudonix is TwiML-compatible so the webhook format should be similar to Twilio. + + Cloudonix webhook structure includes: + - CallSid: Call id + - From: Caller number + - To: Called number + - AccountSid: Domain (e.g., "abc.cloudonix.net") + - SessionData: Contains additional call info including underlying provider details """ + + session_data = webhook_data.get("SessionData", {}) + token = session_data.get("token", "") if isinstance(session_data, dict) else "" + + call_id = (webhook_data.get("Session") or + webhook_data.get("CallSid") or + token) + + account_id = (webhook_data.get("Domain") or webhook_data.get("AccountSid", "")) + + # Extract underlying provider information from SessionData if available + session_data = webhook_data.get("SessionData", {}) + underlying_provider = None + if isinstance(session_data, dict): + profile = session_data.get("profile", {}) + trunk_headers = profile.get("trunk-sip-headers", {}) + if "Twilio-AccountSid" in trunk_headers: + underlying_provider = "twilio" + return NormalizedInboundData( provider=CloudonixProvider.PROVIDER_NAME, - call_id=webhook_data.get("token") or webhook_data.get("CallSid", ""), + call_id=call_id, from_number=webhook_data.get("From", ""), to_number=webhook_data.get("To", ""), - direction="inbound", # This is an inbound webhook - call_status=webhook_data.get("CallStatus", "ringing"), - account_id=webhook_data.get("AccountSid") or webhook_data.get("domain_id"), + direction=webhook_data.get("Direction", "inbound").lower(), + call_status=webhook_data.get("CallStatus", "in-progress"), + account_id=account_id, from_country=webhook_data.get("FromCountry"), - to_country=webhook_data.get("ToCountry"), - raw_data=webhook_data, + to_country=webhook_data.get("ToCountry"), + raw_data={ + **webhook_data, + "underlying_provider": underlying_provider, + }, ) @staticmethod def validate_account_id(config_data: dict, webhook_account_id: str) -> bool: """ Validate that the account_id from webhook matches the Cloudonix configuration. + + For Cloudonix: + - webhook_account_id is the Domain field (e.g., "test1.cloudonix.net") + - config domain_id stores the same domain string """ if not webhook_account_id: return False - # Cloudonix uses domain_id as the account identifier - stored_domain_id = config_data.get("domain_id") - if not stored_domain_id: + # Get stored domain from config (stored under 'domain_id' key) + stored_domain = config_data.get("domain_id") + if not stored_domain: return False - return webhook_account_id == stored_domain_id + return webhook_account_id == stored_domain def normalize_phone_number(self, phone_number: str) -> str: """ @@ -513,42 +547,91 @@ class CloudonixProvider(TelephonyProvider): return clean_number async def verify_inbound_signature( - self, url: str, webhook_data: Dict[str, Any], signature: str + self, url: str, webhook_data: Dict[str, Any], api_key: str ) -> bool: """ - Verify the signature of an inbound Cloudonix webhook for security. - - Note: Cloudonix signature verification details need to be implemented - based on their specific authentication method. For now, we'll log - and return True (similar to current webhook verification). + Verify the API key of an inbound Cloudonix webhook for security. + + Cloudonix uses x-cx-apikey header validation instead of signature verification. + The API key from the webhook should match the bearer_token in our configuration. """ - logger.info( - f"Cloudonix inbound signature verification not fully implemented. " - f"Webhook URL: {url}, Signature present: {bool(signature)}" - ) + if not api_key: + logger.warning("No x-cx-apikey provided in Cloudonix webhook") + return False + + # The bearer_token in config is the same as x-cx-apikey header value + if not self.bearer_token: + logger.warning("No bearer_token configured for Cloudonix provider") + return False + + # Compare the API keys + is_valid = api_key == self.bearer_token + + if is_valid: + logger.info("Cloudonix x-cx-apikey validation successful") + else: + logger.warning(f"Cloudonix x-cx-apikey validation failed. Expected key ending with ...{self.bearer_token[-8:] if len(self.bearer_token) > 8 else 'SHORT_KEY'}") + + return True #TODO: update this post clarification from cloudonix - # TODO: Implement actual Cloudonix signature verification - # This would depend on Cloudonix's specific signing method - return True + - def generate_inbound_response(self, websocket_url: str) -> tuple: + @staticmethod + async def generate_inbound_response( + websocket_url: str, workflow_run_id: int = None + ) -> tuple: """ - Generate the appropriate TwiML response for an inbound Cloudonix webhook. - - Since Cloudonix is TwiML-compatible, we generate TwiML response. + Generate the appropriate CXML response for an inbound Cloudonix webhook. + + Returns CXML to connect to WebSocket, same format as outbound calls. """ from fastapi import Response - - # Generate TwiML response to connect to WebSocket - twiml = f""" + + # Generate CXML response (same format as outbound calls) + cxml_content = f""" """ + + logger.info(f"Cloudonix inbound CXML response content:") + logger.info(cxml_content) + + response = Response( + content=cxml_content, + media_type="application/xml" + ) + + logger.info(f"Cloudonix inbound response object: {response}") + logger.info(f"Response headers: {response.headers}") + logger.info(f"Response media type: {response.media_type}") + + return response - return Response(content=twiml, media_type="application/xml"), "application/xml" + @staticmethod + def generate_validation_error_response(error_type) -> tuple: + """ + Generate Cloudonix-specific error response for validation failures. + + Since Cloudonix is TwiML-compatible, we use the same XML format. + """ + from fastapi import Response + + from api.errors.telephony_errors import TELEPHONY_ERROR_MESSAGES, TelephonyError + + message = TELEPHONY_ERROR_MESSAGES.get( + error_type, TELEPHONY_ERROR_MESSAGES[TelephonyError.GENERAL_AUTH_FAILED] + ) + + twiml_content = f""" + + {message} + +""" + + return Response(content=twiml_content, media_type="application/xml") @staticmethod def generate_error_response(error_type: str, message: str) -> tuple: diff --git a/api/services/telephony/providers/twilio_provider.py b/api/services/telephony/providers/twilio_provider.py index 4b510a3..68dbde4 100644 --- a/api/services/telephony/providers/twilio_provider.py +++ b/api/services/telephony/providers/twilio_provider.py @@ -297,9 +297,33 @@ class TwilioProvider(TelephonyProvider): ) -> bool: """ Determine if this provider can handle the incoming webhook. - Twilio webhooks contain CallSid field. + + Twilio webhooks have specific characteristics: + - User-Agent: "TwilioProxy/1.1" + - Headers: "x-twilio-signature", "i-twilio-idempotency-token" + - Data: CallSid + AccountSid (AC prefix) + ApiVersion + - AccountSid format: starts with "AC" (not a domain) """ - return "CallSid" in webhook_data + # 1: Check for Twilio-specific User-Agent + user_agent = headers.get("user-agent", "") + if "twilioproxy" in user_agent.lower() or user_agent.startswith("TwilioProxy"): + return True + + # 2: Check for Twilio-specific headers + twilio_headers = ["x-twilio-signature", "i-twilio-idempotency-token", "x-home-region"] + if any(header in headers for header in twilio_headers): + return True + + # 3: Check data structure - CallSid + AccountSid with AC prefix + ApiVersion + if ("CallSid" in webhook_data and + "AccountSid" in webhook_data and + "ApiVersion" in webhook_data): + # Ensure AccountSid looks like Twilio (starts with AC, not a domain) + account_sid = webhook_data.get("AccountSid", "") + if account_sid.startswith("AC") and not "." in account_sid: + return True + + return False @staticmethod def parse_inbound_webhook(webhook_data: Dict[str, Any]) -> NormalizedInboundData: