diff --git a/api/alembic/versions/2be183567909_add_cloudonix_mode_for_workflow.py b/api/alembic/versions/2be183567909_add_cloudonix_mode_for_workflow.py new file mode 100644 index 0000000..3730f5c --- /dev/null +++ b/api/alembic/versions/2be183567909_add_cloudonix_mode_for_workflow.py @@ -0,0 +1,69 @@ +"""add cloudonix mode for workflow + +Revision ID: 2be183567909 +Revises: 36b5dbf670e4 +Create Date: 2025-12-02 18:30:36.286830 + +""" + +from typing import Sequence, Union + +from alembic import op +from alembic_postgresql_enum import TableReference + +# revision identifiers, used by Alembic. +revision: str = "2be183567909" +down_revision: Union[str, None] = "36b5dbf670e4" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.sync_enum_values( + enum_schema="public", + enum_name="workflow_run_mode", + new_values=[ + "twilio", + "vonage", + "vobiz", + "cloudonix", + "stasis", + "webrtc", + "smallwebrtc", + "VOICE", + "CHAT", + ], + affected_columns=[ + TableReference( + table_schema="public", table_name="workflow_runs", column_name="mode" + ) + ], + enum_values_to_rename=[], + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.sync_enum_values( + enum_schema="public", + enum_name="workflow_run_mode", + new_values=[ + "twilio", + "vonage", + "vobiz", + "stasis", + "webrtc", + "smallwebrtc", + "VOICE", + "CHAT", + ], + affected_columns=[ + TableReference( + table_schema="public", table_name="workflow_runs", column_name="mode" + ) + ], + enum_values_to_rename=[], + ) + # ### end Alembic commands ### diff --git a/api/enums.py b/api/enums.py index 7f3aff1..4c0295d 100644 --- a/api/enums.py +++ b/api/enums.py @@ -16,6 +16,7 @@ class WorkflowRunMode(Enum): TWILIO = "twilio" VONAGE = "vonage" VOBIZ = "vobiz" + CLOUDONIX = "cloudonix" STASIS = "stasis" WEBRTC = "webrtc" SMALLWEBRTC = "smallwebrtc" diff --git a/api/routes/organization.py b/api/routes/organization.py index 4fc7abc..1a14b8a 100644 --- a/api/routes/organization.py +++ b/api/routes/organization.py @@ -6,6 +6,8 @@ from api.db import db_client from api.db.models import UserModel from api.enums import OrganizationConfigurationKey from api.schemas.telephony_config import ( + CloudonixConfigurationRequest, + CloudonixConfigurationResponse, TelephonyConfigurationResponse, TwilioConfigurationRequest, TwilioConfigurationResponse, @@ -24,6 +26,7 @@ PROVIDER_MASKED_FIELDS = { "twilio": ["account_sid", "auth_token"], "vonage": ["private_key", "api_key", "api_secret"], "vobiz": ["auth_id", "auth_token"], + "cloudonix": ["bearer_token"], } @@ -60,6 +63,7 @@ async def get_telephony_configuration(user: UserModel = Depends(get_user)): ), vonage=None, vobiz=None, + cloudonix=None, ) elif stored_provider == "vonage": application_id = config.value.get("application_id", "") @@ -83,6 +87,7 @@ async def get_telephony_configuration(user: UserModel = Depends(get_user)): from_numbers=from_numbers, ), vobiz=None, + cloudonix=None, ) elif stored_provider == "vobiz": auth_id = config.value.get("auth_id", "") @@ -100,6 +105,23 @@ async def get_telephony_configuration(user: UserModel = Depends(get_user)): auth_token=mask_key(auth_token) if auth_token else "", from_numbers=from_numbers, ), + cloudonix=None, + ) + elif stored_provider == "cloudonix": + bearer_token = config.value.get("bearer_token", "") + domain_id = config.value.get("domain_id", "") + from_numbers = config.value.get("from_numbers", []) + + return TelephonyConfigurationResponse( + twilio=None, + vonage=None, + cloudonix=CloudonixConfigurationResponse( + provider="cloudonix", + bearer_token=mask_key(bearer_token) if bearer_token else "", + domain_id=domain_id, + from_numbers=from_numbers, + ), + vobiz=None, ) else: return TelephonyConfigurationResponse() @@ -111,6 +133,7 @@ async def save_telephony_configuration( TwilioConfigurationRequest, VonageConfigurationRequest, VobizConfigurationRequest, + CloudonixConfigurationRequest, ], user: UserModel = Depends(get_user), ): @@ -148,6 +171,13 @@ async def save_telephony_configuration( "auth_token": request.auth_token, "from_numbers": request.from_numbers, } + elif request.provider == "cloudonix": + config_value = { + "provider": "cloudonix", + "bearer_token": request.bearer_token, + "domain_id": request.domain_id, + "from_numbers": request.from_numbers, + } else: raise HTTPException( status_code=400, detail=f"Unsupported provider: {request.provider}" diff --git a/api/routes/telephony.py b/api/routes/telephony.py index 239a32f..53ca780 100644 --- a/api/routes/telephony.py +++ b/api/routes/telephony.py @@ -152,11 +152,14 @@ async def initiate_call( f"&organization_id={user.selected_organization_id}" ) + keywords = {"workflow_id": request.workflow_id, "user_id": user.id} + # Initiate call via provider result = await provider.initiate_call( to_number=phone_number, webhook_url=webhook_url, workflow_run_id=workflow_run_id, + **keywords, ) # Store provider type and any provider-specific metadata in workflow run context @@ -303,6 +306,7 @@ async def handle_twilio_status_callback( # Parse form data form_data = await request.form() callback_data = dict(form_data) + logger.info( f"[run {workflow_run_id}] Received status callback: {json.dumps(callback_data)}" ) @@ -646,3 +650,60 @@ async def handle_vobiz_ring_callback( logger.info(f"[run {workflow_run_id}] Vobiz ring callback logged") return {"status": "success"} + + +@router.post("/cloudonix/status-callback/{workflow_run_id}") +async def handle_cloudonix_status_callback( + workflow_run_id: int, + request: Request, +): + """Handle Cloudonix-specific status callbacks. + + Cloudonix sends call status updates to the callback URL specified during call initiation. + """ + # Parse callback data - determine if JSON or form data + content_type = request.headers.get("content-type", "") + + if "application/json" in content_type: + callback_data = await request.json() + else: + # Assume form data (like Twilio) + form_data = await request.form() + callback_data = dict(form_data) + + logger.info( + f"[run {workflow_run_id}] Received Cloudonix status callback: {json.dumps(callback_data)}" + ) + + # Get workflow run to find organization + workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id) + if not workflow_run: + logger.warning(f"Workflow run {workflow_run_id} not found for status callback") + return {"status": "ignored", "reason": "workflow_run_not_found"} + + # Get workflow and provider + workflow = await db_client.get_workflow_by_id(workflow_run.workflow_id) + if not workflow: + logger.warning(f"Workflow {workflow_run.workflow_id} not found") + return {"status": "ignored", "reason": "workflow_not_found"} + + provider = await get_telephony_provider(workflow.organization_id) + + # Parse the callback data into generic format + parsed_data = provider.parse_status_callback(callback_data) + + # Create StatusCallbackRequest from parsed data + status_update = StatusCallbackRequest( + call_id=parsed_data["call_id"], + status=parsed_data["status"], + from_number=parsed_data.get("from_number"), + to_number=parsed_data.get("to_number"), + direction=parsed_data.get("direction"), + duration=parsed_data.get("duration"), + extra=parsed_data.get("extra", {}), + ) + + # Process the status update + await _process_status_update(workflow_run_id, status_update, workflow_run) + + return {"status": "success"} diff --git a/api/schemas/telephony_config.py b/api/schemas/telephony_config.py index d783cc2..9bc20ea 100644 --- a/api/schemas/telephony_config.py +++ b/api/schemas/telephony_config.py @@ -69,9 +69,30 @@ class VobizConfigurationResponse(BaseModel): from_numbers: List[str] +class CloudonixConfigurationRequest(BaseModel): + """Request schema for Cloudonix configuration.""" + + provider: str = Field(default="cloudonix") + bearer_token: str = Field(..., description="Cloudonix API Bearer Token") + domain_id: str = Field(..., description="Cloudonix Domain ID") + from_numbers: List[str] = Field( + default_factory=list, description="List of Cloudonix phone numbers (optional)" + ) + + +class CloudonixConfigurationResponse(BaseModel): + """Response schema for Cloudonix configuration with masked sensitive fields.""" + + provider: str + bearer_token: str # Masked (e.g., "****************abc1") + domain_id: str # Not sensitive, can show full + from_numbers: List[str] + + class TelephonyConfigurationResponse(BaseModel): """Top-level telephony configuration response.""" twilio: Optional[TwilioConfigurationResponse] = None vonage: Optional[VonageConfigurationResponse] = None vobiz: Optional[VobizConfigurationResponse] = None + cloudonix: Optional[CloudonixConfigurationResponse] = None diff --git a/api/services/pipecat/audio_config.py b/api/services/pipecat/audio_config.py index 966e8e0..829a8be 100644 --- a/api/services/pipecat/audio_config.py +++ b/api/services/pipecat/audio_config.py @@ -87,7 +87,7 @@ def create_audio_config(transport_type: str) -> AudioConfig: """Create audio configuration based on transport type. Args: - transport_type: Type of transport ("webrtc", "twilio", "vonage", "stasis") + transport_type: Type of transport ("webrtc", "twilio", "vonage", "vobiz", "cloudonix", "stasis") Returns: AudioConfig instance with appropriate settings @@ -96,8 +96,9 @@ def create_audio_config(transport_type: str) -> AudioConfig: WorkflowRunMode.STASIS.value, WorkflowRunMode.TWILIO.value, WorkflowRunMode.VOBIZ.value, + WorkflowRunMode.CLOUDONIX.value, ): - # Twilio, Vobiz, and Stasis use MULAW at 8kHz + # Twilio, Cloudonix, Vobiz, and Stasis use MULAW at 8kHz return AudioConfig( transport_in_sample_rate=8000, transport_out_sample_rate=8000, diff --git a/api/services/pipecat/run_pipeline.py b/api/services/pipecat/run_pipeline.py index 59ecec6..82e5c71 100644 --- a/api/services/pipecat/run_pipeline.py +++ b/api/services/pipecat/run_pipeline.py @@ -30,6 +30,7 @@ from api.services.pipecat.service_factory import ( ) from api.services.pipecat.tracing_config import setup_pipeline_tracing from api.services.pipecat.transport_setup import ( + create_cloudonix_transport, create_stasis_transport, create_twilio_transport, create_vobiz_transport, @@ -240,6 +241,66 @@ async def run_pipeline_vobiz( raise +async def run_pipeline_cloudonix( + websocket_client: WebSocket, + stream_sid: str, + call_sid: str, + workflow_id: int, + workflow_run_id: int, + user_id: int, +) -> None: + """Run pipeline for Cloudonix connections""" + logger.debug( + f"Running pipeline for Cloudonix connection with workflow_id: {workflow_id} and workflow_run_id: {workflow_run_id}" + ) + set_current_run_id(workflow_run_id) + + # Store call ID in cost_info for later cost calculation (provider-agnostic) + cost_info = {"call_id": call_sid} + await db_client.update_workflow_run(workflow_run_id, cost_info=cost_info) + + # Get workflow to extract all pipeline configurations + workflow = await db_client.get_workflow(workflow_id, user_id) + vad_config = None + ambient_noise_config = None + if workflow and workflow.workflow_configurations: + if "vad_configuration" in workflow.workflow_configurations: + vad_config = workflow.workflow_configurations["vad_configuration"] + if "ambient_noise_configuration" in workflow.workflow_configurations: + ambient_noise_config = workflow.workflow_configurations[ + "ambient_noise_configuration" + ] + + # Retrieve session_token from workflow_run gathered_context + workflow_run = await db_client.get_workflow_run(workflow_run_id) + session_token = None + if workflow_run and workflow_run.gathered_context: + session_token = workflow_run.gathered_context.get("session_token") + logger.debug(f"Retrieved session_token from workflow_run: {session_token}") + + # Create audio configuration for Cloudonix + audio_config = create_audio_config(WorkflowRunMode.CLOUDONIX.value) + + transport = await create_cloudonix_transport( + websocket_client, + stream_sid, + call_sid, + workflow_run_id, + audio_config, + workflow.organization_id, + vad_config, + ambient_noise_config, + session_token, + ) + await _run_pipeline( + transport, + workflow_id, + workflow_run_id, + user_id, + audio_config=audio_config, + ) + + async def run_pipeline_smallwebrtc( webrtc_connection: SmallWebRTCConnection, workflow_id: int, diff --git a/api/services/pipecat/transport_setup.py b/api/services/pipecat/transport_setup.py index eb8811d..fd5d20c 100644 --- a/api/services/pipecat/transport_setup.py +++ b/api/services/pipecat/transport_setup.py @@ -127,6 +127,88 @@ async def create_twilio_transport( ), ) +async def create_cloudonix_transport( + websocket_client: WebSocket, + stream_sid: str, + call_sid: str, + workflow_run_id: int, + audio_config: AudioConfig, + organization_id: int, + vad_config: dict | None = None, + ambient_noise_config: dict | None = None, + session_token: str | None = None, +): + """Create a transport for Cloudonix connections""" + + # Load Cloudonix configuration from database + from api.services.telephony.factory import load_telephony_config + + config = await load_telephony_config(organization_id) + + if config.get("provider") != "cloudonix": + raise ValueError(f"Expected Cloudonix provider, got {config.get('provider')}") + + bearer_token = config.get("bearer_token") + domain_id = config.get("domain_id") + + if not bearer_token or not domain_id: + raise ValueError( + f"Incomplete Cloudonix configuration for organization {organization_id}. " + f"Required: bearer_token, domain_id" + ) + + turn_analyzer = create_turn_analyzer(workflow_run_id, audio_config) + + from pipecat.serializers.cloudonix import CloudonixFrameSerializer + + serializer = CloudonixFrameSerializer( + stream_sid=stream_sid, + call_sid=call_sid, + domain_id=domain_id, + bearer_token=bearer_token, + session_token=session_token, + ) + + return FastAPIWebsocketTransport( + websocket=websocket_client, + params=FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + audio_in_sample_rate=audio_config.transport_in_sample_rate, + audio_out_sample_rate=audio_config.transport_out_sample_rate, + vad_analyzer=( + SileroVADAnalyzer( + params=VADParams( + confidence=vad_config.get("confidence", 0.7), + start_secs=vad_config.get("start_seconds", 0.4), + stop_secs=vad_config.get("stop_seconds", 0.8), + min_volume=vad_config.get("minimum_volume", 0.6), + ) + ) + if vad_config + else SileroVADAnalyzer() + ), # Sample rate will be set by transport + audio_out_mixer=( + SoundfileMixer( + sound_files={ + "office": APP_ROOT_DIR + / "assets" + / f"office-ambience-{audio_config.transport_out_sample_rate}-mono.wav" + }, + default_sound="office", + volume=ambient_noise_config.get("volume", 0.3), + ) + if ambient_noise_config and ambient_noise_config.get("enabled", False) + else SilenceAudioMixer() + ), + turn_analyzer=turn_analyzer, + serializer=serializer, + audio_in_filter=RNNoiseFilter(library_path=librnnoise_path) + if ENABLE_RNNOISE + else None, + ), + ) + async def create_vonage_transport( websocket_client, diff --git a/api/services/telephony/factory.py b/api/services/telephony/factory.py index 9da13ce..eb56316 100644 --- a/api/services/telephony/factory.py +++ b/api/services/telephony/factory.py @@ -11,6 +11,7 @@ from loguru import logger from api.db import db_client from api.enums import OrganizationConfigurationKey from api.services.telephony.base import TelephonyProvider +from api.services.telephony.providers.cloudonix_provider import CloudonixProvider from api.services.telephony.providers.twilio_provider import TwilioProvider from api.services.telephony.providers.vobiz_provider import VobizProvider from api.services.telephony.providers.vonage_provider import VonageProvider @@ -66,6 +67,13 @@ async def load_telephony_config(organization_id: int) -> Dict[str, Any]: "auth_token": config.value.get("auth_token"), "from_numbers": config.value.get("from_numbers", []), } + elif provider == "cloudonix": + return { + "provider": "cloudonix", + "bearer_token": config.value.get("bearer_token"), + "domain_id": config.value.get("domain_id"), + "from_numbers": config.value.get("from_numbers", []), + } else: raise ValueError(f"Unknown provider in config: {provider}") @@ -103,5 +111,8 @@ async def get_telephony_provider(organization_id: int) -> TelephonyProvider: elif provider_type == "vobiz": return VobizProvider(config) + elif provider_type == "cloudonix": + return CloudonixProvider(config) + else: raise ValueError(f"Unknown telephony provider: {provider_type}") diff --git a/api/services/telephony/providers/cloudonix_provider.py b/api/services/telephony/providers/cloudonix_provider.py new file mode 100644 index 0000000..e62142b --- /dev/null +++ b/api/services/telephony/providers/cloudonix_provider.py @@ -0,0 +1,418 @@ +""" +Cloudonix implementation of the TelephonyProvider interface. +""" + +import json +import random +from typing import TYPE_CHECKING, Any, Dict, List, Optional + +import aiohttp +from loguru import logger + +from api.enums import WorkflowRunMode +from api.services.telephony.base import CallInitiationResult, TelephonyProvider +from api.utils.tunnel import TunnelURLProvider + +if TYPE_CHECKING: + from fastapi import WebSocket + + +class CloudonixProvider(TelephonyProvider): + """ + Cloudonix implementation of TelephonyProvider. + Uses Bearer token authentication and is TwiML-compatible for WebSocket audio. + """ + + PROVIDER_NAME = WorkflowRunMode.CLOUDONIX.value + WEBHOOK_ENDPOINT = "twiml" # Cloudonix is TwiML-compatible + + def __init__(self, config: Dict[str, Any]): + """ + Initialize CloudonixProvider with configuration. + + Args: + config: Dictionary containing: + - bearer_token: Cloudonix API Bearer Token + - domain_id: Cloudonix Domain ID + - from_numbers: List of phone numbers to use (optional, fetched from API if not provided) + """ + self.bearer_token = config.get("bearer_token") + self.domain_id = config.get("domain_id") + self.from_numbers = config.get("from_numbers", []) + + # Handle both single number (string) and multiple numbers (list) + if isinstance(self.from_numbers, str): + self.from_numbers = [self.from_numbers] + + self.base_url = "https://api.cloudonix.io" + + def _get_auth_headers(self) -> Dict[str, str]: + """Generate authorization headers for Cloudonix API.""" + return { + "Authorization": f"Bearer {self.bearer_token}", + "Content-Type": "application/json", + } + + async def initiate_call( + self, + to_number: str, + webhook_url: str, + workflow_run_id: Optional[int] = None, + **kwargs: Any, + ) -> CallInitiationResult: + """ + Initiate an outbound call via Cloudonix. + + Note: webhook_url parameter is ignored for Cloudonix. Unlike Twilio/Vonage, + Cloudonix embeds CXML directly in the API call rather than using webhook callbacks. + """ + if not self.validate_config(): + raise ValueError("Cloudonix provider not properly configured") + + endpoint = f"{self.base_url}/calls/{self.domain_id}/application" + + # Select a random phone number for caller-id (REQUIRED by Cloudonix) + if not self.from_numbers: + raise ValueError( + "No phone numbers configured for Cloudonix provider. " + "At least one phone number is required as 'caller-id' for outbound calls. " + "Please configure phone numbers in the telephony settings." + ) + + from_number = random.choice(self.from_numbers) + logger.info( + f"Selected phone number {from_number} for outbound call to {to_number}" + ) + workflow_id, user_id = kwargs["workflow_id"], kwargs["user_id"] + + # Prepare call data using Cloudonix callObject schema + # Note: 'caller-id' is REQUIRED by Cloudonix API + backend_endpoint = await TunnelURLProvider.get_tunnel_url() + data: Dict[str, Any] = { + "destination": to_number, + "cxml": f""" + + + + + +""", + "caller-id": from_number, # Required field + } + + # Add status callback if workflow_run_id provided + if workflow_run_id: + callback_url = f"https://{backend_endpoint}/api/v1/telephony/cloudonix/status-callback/{workflow_run_id}" + data["callback"] = callback_url + + # Merge any additional kwargs + data.update(kwargs) + + # Make the API request + headers = self._get_auth_headers() + + # Log request details (mask sensitive token) + masked_headers = { + k: v if k != "Authorization" else f"Bearer {self.bearer_token[:8]}..." + for k, v in headers.items() + } + logger.info( + f"[Cloudonix] Initiating outbound call:\n" + f" Endpoint: {endpoint}\n" + f" To: {to_number}\n" + f" From: {from_number}\n" + f" Workflow Run ID: {workflow_run_id}" + ) + logger.debug( + f"[Cloudonix] Request details:\n" + f" Headers: {masked_headers}\n" + f" Payload: {json.dumps(data, indent=2)}" + ) + + async with aiohttp.ClientSession() as session: + async with session.post(endpoint, json=data, headers=headers) as response: + response_text = await response.text() + response_status = response.status + + # Log response + logger.info( + f"[Cloudonix] API Response:\n" + f" HTTP Status: {response_status}\n" + f" Response Body: {response_text}" + ) + + if response_status != 200: + logger.error( + f"[Cloudonix] Call initiation FAILED:\n" + f" HTTP Status: {response_status}\n" + f" Error Details: {response_text}\n" + f" Request: POST {endpoint}\n" + f" Payload: {json.dumps(data, indent=2)}" + ) + raise Exception( + f"Failed to initiate call via Cloudonix (HTTP {response_status}): {response_text}" + ) + + response_data = await response.json() + + # Extract session token (call ID) and other metadata + session_token = response_data.get("token") + domain_id = response_data.get("domainId") + subscriber_id = response_data.get("subscriberId") + + if not session_token: + logger.error( + f"[Cloudonix] Missing session token in response:\n" + f" Response: {json.dumps(response_data, indent=2)}" + ) + raise Exception("No session token returned from Cloudonix") + + logger.info( + f"[Cloudonix] Call initiated successfully:\n" + f" Session Token: {session_token}\n" + f" Domain ID: {domain_id}\n" + f" Subscriber ID: {subscriber_id}\n" + f" To: {to_number}\n" + f" From: {from_number}\n" + f" Workflow Run ID: {workflow_run_id}" + ) + + return CallInitiationResult( + call_id=session_token, + status="initiated", + provider_metadata={ + "session_token": session_token, + "domain_id": domain_id, + "subscriber_id": subscriber_id, + }, + raw_response=response_data, + ) + + async def get_call_status(self, call_id: str) -> Dict[str, Any]: + """ + Get the current status of a Cloudonix call (session). + + Args: + call_id: The session token returned from call initiation + """ + if not self.validate_config(): + raise ValueError("Cloudonix provider not properly configured") + + endpoint = ( + f"{self.base_url}/customers/self/domains/" + f"{self.domain_id}/sessions/{call_id}" + ) + + headers = self._get_auth_headers() + async with aiohttp.ClientSession() as session: + async with session.get(endpoint, headers=headers) as response: + if response.status != 200: + error_data = await response.text() + logger.error(f"Failed to get call status: {error_data}") + raise Exception(f"Failed to get call status: {error_data}") + + return await response.json() + + async def get_available_phone_numbers(self) -> List[str]: + """ + Get list of available Cloudonix phone numbers (DNIDs). + """ + # If phone numbers are already configured, return them + if self.from_numbers: + return self.from_numbers + + # Otherwise, fetch from API + if not self.validate_config(): + raise ValueError("Cloudonix provider not properly configured") + + endpoint = f"{self.base_url}/customers/self/domains/{self.domain_id}/dnids" + + headers = self._get_auth_headers() + try: + async with aiohttp.ClientSession() as session: + async with session.get(endpoint, headers=headers) as response: + if response.status != 200: + logger.warning( + f"Failed to fetch DNIDs from Cloudonix: {response.status}" + ) + return [] + + dnids = await response.json() + + # Extract phone numbers from DNID objects + # Use "source" field which contains the original phone number + phone_numbers = [ + dnid.get("source") or dnid.get("dnid") + for dnid in dnids + if dnid.get("source") or dnid.get("dnid") + ] + + # Cache the fetched numbers + self.from_numbers = phone_numbers + return phone_numbers + + except Exception as e: + logger.error(f"Exception fetching Cloudonix DNIDs: {e}") + return [] + + def validate_config(self) -> bool: + """ + Validate Cloudonix configuration. + """ + return bool(self.bearer_token and self.domain_id) + + async def verify_webhook_signature( + self, url: str, params: Dict[str, Any], signature: str + ) -> bool: + """ + Dummy implementation - Cloudonix doesn't use webhook signature verification. + + Cloudonix embeds CXML directly in the API call during initiate_call(), + so webhook endpoints are never called and signature verification is not needed. + This method only exists to satisfy the abstract base class requirement. + + Always returns True since no actual webhook verification is performed. + """ + logger.warning( + "verify_webhook_signature called for Cloudonix - this should not happen. " + "Cloudonix embeds CXML directly in API calls and doesn't use webhook callbacks." + ) + return True + + async def get_call_cost(self, call_id: str) -> Dict[str, Any]: + """ + Get cost information for a completed Cloudonix call. + + Note: Cloudonix does not currently support call cost retrieval via API. + This method returns zero cost. + + Args: + call_id: The Cloudonix session token + + Returns: + Dict containing cost information (all zeros for now) + """ + logger.info( + f"Cloudonix does not support call cost retrieval - returning zero cost for call {call_id}" + ) + + return { + "cost_usd": 0.0, + "duration": 0, + "status": "unknown", + "error": "Cloudonix does not support cost retrieval", + } + + def parse_status_callback(self, data: Dict[str, Any]) -> Dict[str, Any]: + """ + Parse Cloudonix status callback data into generic format. + + Note: The exact format of Cloudonix status callbacks needs to be confirmed. + This implementation assumes a similar structure to Twilio. + """ + # Map Cloudonix status values to common format + # These mappings may need adjustment based on actual Cloudonix callback format + status_map = { + "initiated": "initiated", + "ringing": "ringing", + "answered": "answered", + "completed": "completed", + "failed": "failed", + "busy": "busy", + "no-answer": "no-answer", + "canceled": "canceled", + } + + call_status = data.get("status", "") + mapped_status = status_map.get(call_status.lower(), call_status) + + return { + "call_id": data.get("token") + or data.get("session_id") + or data.get("CallSid", ""), + "status": mapped_status, + "from_number": data.get("caller_id") or data.get("From"), + "to_number": data.get("destination") or data.get("To"), + "direction": data.get("direction"), + "duration": data.get("duration") or data.get("CallDuration"), + "extra": data, # Include all original data + } + + async def get_webhook_response( + self, workflow_id: int, user_id: int, workflow_run_id: int + ) -> str: + """ + Dummy implementation - Cloudonix doesn't use webhook responses. + + Cloudonix embeds CXML directly in the API call during initiate_call(), + so this webhook endpoint is never actually called. This method only + exists to satisfy the abstract base class requirement. + """ + logger.warning( + "get_webhook_response called for Cloudonix - this should not happen. " + "Cloudonix embeds CXML directly in API calls." + ) + return """ + + Error: This endpoint should not be called for Cloudonix +""" + + async def handle_websocket( + self, + websocket: "WebSocket", + workflow_id: int, + user_id: int, + workflow_run_id: int, + ) -> None: + """ + Handle Cloudonix-specific WebSocket connection. + + Cloudonix WebSocket is compatible with Twilio, so we use the same handler. + Cloudonix sends: + 1. "connected" event first + 2. "start" event with streamSid and callSid + 3. Then audio messages + """ + from api.services.pipecat.run_pipeline import run_pipeline_cloudonix + + try: + # Wait for "connected" event + first_msg = await websocket.receive_text() + msg = json.loads(first_msg) + + if msg.get("event") != "connected": + logger.error(f"Expected 'connected' event, got: {msg.get('event')}") + await websocket.close(code=4400, reason="Expected connected event") + return + + logger.debug( + f"Cloudonix WebSocket connected for workflow_run {workflow_run_id}" + ) + + # Wait for "start" event with stream details + start_msg = await websocket.receive_text() + logger.debug(f"Received start message: {start_msg}") + + start_msg = json.loads(start_msg) + if start_msg.get("event") != "start": + logger.error("Expected 'start' event second") + await websocket.close(code=4400, reason="Expected start event") + return + + # Extract Twilio-compatible identifiers + try: + stream_sid = start_msg["start"]["streamSid"] + call_sid = start_msg["start"]["callSid"] + except KeyError: + logger.error("Missing streamSid or callSid in start message") + await websocket.close(code=4400, reason="Missing stream identifiers") + return + + # Run the Cloudonix pipeline + await run_pipeline_cloudonix( + websocket, stream_sid, call_sid, workflow_id, workflow_run_id, user_id + ) + + except Exception as e: + logger.error(f"Error in Cloudonix WebSocket handler: {e}") + raise diff --git a/pipecat b/pipecat index a66062d..07626c6 160000 --- a/pipecat +++ b/pipecat @@ -1 +1 @@ -Subproject commit a66062d3e7bbf620295cebf0956d4ba86de6a507 +Subproject commit 07626c642653a18db70a50d097cac04b58f3a54e diff --git a/ui/Dockerfile b/ui/Dockerfile index 18914a8..91acdb5 100644 --- a/ui/Dockerfile +++ b/ui/Dockerfile @@ -45,6 +45,8 @@ ENV NEXT_PUBLIC_CHATWOOT_URL="https://chat.dograh.com" ENV NEXT_PUBLIC_CHATWOOT_TOKEN="3fkFx2mCEjNHjM9gaNc4A82X" # Build the application with standalone mode +# Increase Node.js heap size to prevent out-of-memory errors during build +ENV NODE_OPTIONS="--max-old-space-size=4096" RUN npm run build && \ rm -rf /tmp/* /root/.npm /root/.next/cache diff --git a/ui/src/app/telephony-configurations/page.tsx b/ui/src/app/telephony-configurations/page.tsx index 850b871..d171611 100644 --- a/ui/src/app/telephony-configurations/page.tsx +++ b/ui/src/app/telephony-configurations/page.tsx @@ -1,11 +1,19 @@ "use client"; +import { useRouter, useSearchParams } from "next/navigation"; import { useEffect, useState } from "react"; import { useForm } from "react-hook-form"; import { toast } from "sonner"; import { getTelephonyConfigurationApiV1OrganizationsTelephonyConfigGet, saveTelephonyConfigurationApiV1OrganizationsTelephonyConfigPost } from "@/client/sdk.gen"; -import type { TwilioConfigurationRequest, VobizConfigurationRequest,VonageConfigurationRequest } from "@/client/types.gen"; +import type { + CloudonixConfigurationRequest, + CloudonixConfigurationResponse, + TelephonyConfigurationResponse, + TwilioConfigurationRequest, + VobizConfigurationRequest, + VonageConfigurationRequest +} from "@/client/types.gen"; import { Button } from "@/components/ui/button"; import { Card, @@ -39,19 +47,22 @@ interface TelephonyConfigForm { // Vobiz fields auth_id?: string; vobiz_auth_token?: string; + // Cloudonix fields + bearer_token?: string; + domain_id?: string; // Common field from_number: string; } export default function ConfigureTelephonyPage() { + const router = useRouter(); + const searchParams = useSearchParams(); const { user, getAccessToken, loading: authLoading } = useAuth(); const [isLoading, setIsLoading] = useState(false); const [hasExistingConfig, setHasExistingConfig] = useState(false); - // Clean up any stale pointer-events from dialogs that weren't properly closed before navigation - useEffect(() => { - document.body.style.pointerEvents = ''; - }, []); + // Get returnTo parameter from URL + const returnTo = searchParams.get("returnTo") || "/workflow"; const { register, @@ -109,6 +120,15 @@ export default function ConfigureTelephonyPage() { if (response.data.vobiz.from_numbers?.length > 0) { setValue("from_number", response.data.vobiz.from_numbers[0]); } + } else if ((response.data as TelephonyConfigurationResponse)?.cloudonix) { + const cloudonixConfig = (response.data as TelephonyConfigurationResponse).cloudonix as CloudonixConfigurationResponse; + setHasExistingConfig(true); + setValue("provider", "cloudonix"); + setValue("bearer_token", cloudonixConfig.bearer_token); + setValue("domain_id", cloudonixConfig.domain_id); + if (cloudonixConfig.from_numbers?.length > 0) { + setValue("from_number", cloudonixConfig.from_numbers[0]); + } } } } catch (error) { @@ -126,7 +146,11 @@ export default function ConfigureTelephonyPage() { const accessToken = await getAccessToken(); // Build the request body based on provider - let requestBody: TwilioConfigurationRequest | VonageConfigurationRequest | VobizConfigurationRequest; + let requestBody: + | TwilioConfigurationRequest + | VonageConfigurationRequest + | VobizConfigurationRequest + | CloudonixConfigurationRequest; if (data.provider === "twilio") { requestBody = { @@ -144,18 +168,26 @@ export default function ConfigureTelephonyPage() { api_key: data.api_key || undefined, api_secret: data.api_secret || undefined, } as VonageConfigurationRequest; - } else { + } else if (data.provider === "vobiz") { requestBody = { provider: data.provider, from_numbers: [data.from_number], auth_id: data.auth_id, auth_token: data.vobiz_auth_token, } as VobizConfigurationRequest; + } else { + // Cloudonix + requestBody = { + provider: data.provider, + from_numbers: data.from_number ? [data.from_number] : [], + bearer_token: data.bearer_token!, + domain_id: data.domain_id!, + } as CloudonixConfigurationRequest; } const response = await saveTelephonyConfigurationApiV1OrganizationsTelephonyConfigPost({ headers: { Authorization: `Bearer ${accessToken}` }, - body: requestBody, + body: requestBody }); if (response.error) { @@ -166,6 +198,9 @@ export default function ConfigureTelephonyPage() { } toast.success("Telephony configuration saved successfully"); + + // Redirect back to the page that sent us here + router.push(returnTo); } catch (error) { toast.error( error instanceof Error @@ -178,40 +213,127 @@ export default function ConfigureTelephonyPage() { }; return ( -
-
+
+

Configure Telephony

-

+

Set up your telephony provider to make phone calls

-
-
+
- {selectedProvider === "twilio" ? "Twilio" : "Vonage"} Setup Guide + {selectedProvider === "twilio" + ? "Twilio" + : selectedProvider === "vonage" + ? "Vonage" + : selectedProvider === "vobiz" + ? "Vobiz" + : "Cloudonix"}{" "} + Setup Guide - Watch this video to learn how to setup {selectedProvider === "twilio" ? "Twilio" : "Vonage"} + {selectedProvider === "cloudonix" ? ( + <> + Cloudonix is an AI Connectivity platform, enabling you to connect Dograh to any SIP product or SIP Telephony Provider.

+