diff --git a/api/alembic/versions/6d2f94baf4b7_add_ari_mode.py b/api/alembic/versions/6d2f94baf4b7_add_ari_mode.py index 90970106..3173e2d3 100644 --- a/api/alembic/versions/6d2f94baf4b7_add_ari_mode.py +++ b/api/alembic/versions/6d2f94baf4b7_add_ari_mode.py @@ -5,15 +5,15 @@ Revises: 1a7d74d54e8f Create Date: 2026-02-15 13:52:29.285583 """ + from typing import Sequence, Union from alembic import op -import sqlalchemy as sa from alembic_postgresql_enum import TableReference # revision identifiers, used by Alembic. -revision: str = '6d2f94baf4b7' -down_revision: Union[str, None] = '1a7d74d54e8f' +revision: str = "6d2f94baf4b7" +down_revision: Union[str, None] = "1a7d74d54e8f" branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None @@ -21,10 +21,25 @@ depends_on: Union[str, Sequence[str], None] = None def upgrade() -> None: # ### commands auto generated by Alembic - please adjust! ### op.sync_enum_values( - enum_schema='public', - enum_name='workflow_run_mode', - new_values=['ari', 'twilio', 'vonage', 'vobiz', 'cloudonix', 'webrtc', 'smallwebrtc', 'stasis', 'VOICE', 'CHAT'], - affected_columns=[TableReference(table_schema='public', table_name='workflow_runs', column_name='mode')], + enum_schema="public", + enum_name="workflow_run_mode", + new_values=[ + "ari", + "twilio", + "vonage", + "vobiz", + "cloudonix", + "webrtc", + "smallwebrtc", + "stasis", + "VOICE", + "CHAT", + ], + affected_columns=[ + TableReference( + table_schema="public", table_name="workflow_runs", column_name="mode" + ) + ], enum_values_to_rename=[], ) # ### end Alembic commands ### @@ -33,10 +48,24 @@ def upgrade() -> None: def downgrade() -> None: # ### commands auto generated by Alembic - please adjust! ### op.sync_enum_values( - enum_schema='public', - enum_name='workflow_run_mode', - new_values=['twilio', 'vonage', 'vobiz', 'cloudonix', 'stasis', 'webrtc', 'smallwebrtc', 'VOICE', 'CHAT'], - affected_columns=[TableReference(table_schema='public', table_name='workflow_runs', column_name='mode')], + enum_schema="public", + enum_name="workflow_run_mode", + new_values=[ + "twilio", + "vonage", + "vobiz", + "cloudonix", + "stasis", + "webrtc", + "smallwebrtc", + "VOICE", + "CHAT", + ], + affected_columns=[ + TableReference( + table_schema="public", table_name="workflow_runs", column_name="mode" + ) + ], enum_values_to_rename=[], ) # ### end Alembic commands ### diff --git a/api/routes/organization.py b/api/routes/organization.py index c13913cb..7d4a10a5 100644 --- a/api/routes/organization.py +++ b/api/routes/organization.py @@ -132,6 +132,7 @@ async def get_telephony_configuration(user: UserModel = Depends(get_user)): ari_endpoint = config.value.get("ari_endpoint", "") app_name = config.value.get("app_name", "") app_password = config.value.get("app_password", "") + ws_client_name = config.value.get("ws_client_name", "") from_numbers = config.value.get("from_numbers", []) return TelephonyConfigurationResponse( @@ -140,6 +141,7 @@ async def get_telephony_configuration(user: UserModel = Depends(get_user)): ari_endpoint=ari_endpoint, app_name=app_name, app_password=mask_key(app_password) if app_password else "", + ws_client_name=ws_client_name, from_numbers=from_numbers, ), ) @@ -205,6 +207,7 @@ async def save_telephony_configuration( "ari_endpoint": request.ari_endpoint, "app_name": request.app_name, "app_password": request.app_password, + "ws_client_name": request.ws_client_name, "from_numbers": request.from_numbers, } else: diff --git a/api/routes/telephony.py b/api/routes/telephony.py index 2a69cb0f..905b68c2 100644 --- a/api/routes/telephony.py +++ b/api/routes/telephony.py @@ -523,13 +523,47 @@ async def handle_ncco_webhook( return json.loads(response_content) +@router.websocket("/ws/ari") +async def websocket_ari_endpoint(websocket: WebSocket): + """WebSocket endpoint for ARI chan_websocket external media. + + Asterisk connects here via chan_websocket. Routing params are passed as + query params (appended by the v() dial string option in externalMedia). + """ + workflow_id = websocket.query_params.get("workflow_id") + user_id = websocket.query_params.get("user_id") + workflow_run_id = websocket.query_params.get("workflow_run_id") + + if not workflow_id or not user_id or not workflow_run_id: + logger.error( + f"ARI WebSocket missing query params: " + f"workflow_id={workflow_id}, user_id={user_id}, workflow_run_id={workflow_run_id}" + ) + await websocket.close(code=4400, reason="Missing required query params") + return + + # Accept with "media" subprotocol — chan_websocket sends + # Sec-WebSocket-Protocol: media and requires it echoed back. + await websocket.accept(subprotocol="media") + + await _handle_telephony_websocket( + websocket, int(workflow_id), int(user_id), int(workflow_run_id) + ) + + @router.websocket("/ws/{workflow_id}/{user_id}/{workflow_run_id}") async def websocket_endpoint( websocket: WebSocket, workflow_id: int, user_id: int, workflow_run_id: int ): """WebSocket endpoint for real-time call handling - routes to provider-specific handlers.""" await websocket.accept() + await _handle_telephony_websocket(websocket, workflow_id, user_id, workflow_run_id) + +async def _handle_telephony_websocket( + websocket: WebSocket, workflow_id: int, user_id: int, workflow_run_id: int +): + """Shared WebSocket handler logic (connection already accepted).""" try: # Set the run context set_current_run_id(workflow_run_id) diff --git a/api/schemas/telephony_config.py b/api/schemas/telephony_config.py index 7fd5f728..72c248f2 100644 --- a/api/schemas/telephony_config.py +++ b/api/schemas/telephony_config.py @@ -100,6 +100,10 @@ class ARIConfigurationRequest(BaseModel): ..., description="Stasis application name registered in Asterisk" ) app_password: str = Field(..., description="ARI user password") + ws_client_name: str = Field( + default="", + description="websocket_client.conf connection name for externalMedia (e.g., dograh_staging)", + ) from_numbers: List[str] = Field( default_factory=list, description="List of SIP extensions/numbers for outbound calls (optional)", @@ -113,6 +117,7 @@ class ARIConfigurationResponse(BaseModel): ari_endpoint: str app_name: str app_password: str # Masked + ws_client_name: str = "" from_numbers: List[str] diff --git a/api/services/pipecat/audio_config.py b/api/services/pipecat/audio_config.py index f2ff2d5c..0a5bd547 100644 --- a/api/services/pipecat/audio_config.py +++ b/api/services/pipecat/audio_config.py @@ -96,8 +96,9 @@ def create_audio_config(transport_type: str) -> AudioConfig: WorkflowRunMode.TWILIO.value, WorkflowRunMode.VOBIZ.value, WorkflowRunMode.CLOUDONIX.value, + WorkflowRunMode.ARI.value, ): - # Twilio, Cloudonix, and Vobiz use MULAW at 8kHz + # Twilio, Cloudonix, Vobiz, and ARI use MULAW at 8kHz return AudioConfig( transport_in_sample_rate=8000, transport_out_sample_rate=8000, diff --git a/api/services/pipecat/run_pipeline.py b/api/services/pipecat/run_pipeline.py index 6b7cb4c5..6939c43b 100644 --- a/api/services/pipecat/run_pipeline.py +++ b/api/services/pipecat/run_pipeline.py @@ -32,6 +32,7 @@ from api.services.pipecat.service_factory import ( ) from api.services.pipecat.tracing_config import setup_pipeline_tracing from api.services.pipecat.transport_setup import ( + create_ari_transport, create_cloudonix_transport, create_twilio_transport, create_vobiz_transport, @@ -197,6 +198,63 @@ async def run_pipeline_vonage( raise +async def run_pipeline_ari( + websocket_client: WebSocket, + channel_id: str, + workflow_id: int, + workflow_run_id: int, + user_id: int, +) -> None: + """Run pipeline for Asterisk ARI WebSocket connections. + + ARI uses raw 16-bit signed linear PCM (SLIN16) at 16kHz + transmitted as binary WebSocket frames via chan_websocket. + """ + logger.info(f"Starting ARI pipeline for workflow run {workflow_run_id}") + set_current_run_id(workflow_run_id) + + # Store call ID (channel_id) in cost_info + cost_info = {"call_id": channel_id} + await db_client.update_workflow_run(workflow_run_id, cost_info=cost_info) + + # Get workflow to extract configurations + workflow = await db_client.get_workflow(workflow_id, user_id) + vad_config = None + ambient_noise_config = None + if workflow and workflow.workflow_configurations: + if "vad_configuration" in workflow.workflow_configurations: + vad_config = workflow.workflow_configurations["vad_configuration"] + if "ambient_noise_configuration" in workflow.workflow_configurations: + ambient_noise_config = workflow.workflow_configurations[ + "ambient_noise_configuration" + ] + + try: + audio_config = create_audio_config(WorkflowRunMode.ARI.value) + + transport = await create_ari_transport( + websocket_client, + channel_id, + workflow_run_id, + audio_config, + workflow.organization_id, + vad_config, + ambient_noise_config, + ) + + await _run_pipeline( + transport, + workflow_id, + workflow_run_id, + user_id, + audio_config=audio_config, + ) + + except Exception as e: + logger.error(f"Error in ARI pipeline: {e}") + raise + + async def run_pipeline_vobiz( websocket_client: WebSocket, stream_id: str, diff --git a/api/services/pipecat/transport_setup.py b/api/services/pipecat/transport_setup.py index 2c9ec0a5..0b2f356f 100644 --- a/api/services/pipecat/transport_setup.py +++ b/api/services/pipecat/transport_setup.py @@ -8,6 +8,7 @@ from api.enums import OrganizationConfigurationKey from api.services.pipecat.audio_config import AudioConfig from pipecat.audio.mixers.silence_mixer import SilenceAudioMixer from pipecat.audio.mixers.soundfile_mixer import SoundfileMixer +from pipecat.serializers.asterisk import AsteriskFrameSerializer from pipecat.serializers.twilio import TwilioFrameSerializer from pipecat.serializers.vobiz import VobizFrameSerializer from pipecat.serializers.vonage import VonageFrameSerializer @@ -149,6 +150,70 @@ async def create_cloudonix_transport( ) +async def create_ari_transport( + websocket_client: WebSocket, + channel_id: str, + workflow_run_id: int, + audio_config: AudioConfig, + organization_id: int, + vad_config: dict | None = None, + ambient_noise_config: dict | None = None, +): + """Create a transport for Asterisk ARI connections""" + + from api.services.telephony.factory import load_telephony_config + + config = await load_telephony_config(organization_id) + + if config.get("provider") != "ari": + raise ValueError(f"Expected ARI provider, got {config.get('provider')}") + + ari_endpoint = config.get("ari_endpoint") + app_name = config.get("app_name") + app_password = config.get("app_password") + + if not ari_endpoint or not app_name or not app_password: + raise ValueError( + f"Incomplete ARI configuration for organization {organization_id}. " + f"Required: ari_endpoint, app_name, app_password" + ) + + serializer = AsteriskFrameSerializer( + channel_id=channel_id, + ari_endpoint=ari_endpoint, + app_name=app_name, + app_password=app_password, + params=AsteriskFrameSerializer.InputParams( + asterisk_sample_rate=audio_config.transport_in_sample_rate, + sample_rate=audio_config.pipeline_sample_rate, + ), + ) + + return FastAPIWebsocketTransport( + websocket=websocket_client, + params=FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + audio_in_sample_rate=audio_config.transport_in_sample_rate, + audio_out_sample_rate=audio_config.transport_out_sample_rate, + audio_out_mixer=( + SoundfileMixer( + sound_files={ + "office": APP_ROOT_DIR + / "assets" + / f"office-ambience-{audio_config.transport_out_sample_rate}-mono.wav" + }, + default_sound="office", + volume=ambient_noise_config.get("volume", 0.3), + ) + if ambient_noise_config and ambient_noise_config.get("enabled", False) + else SilenceAudioMixer() + ), + serializer=serializer, + ), + ) + + async def create_vonage_transport( websocket_client, call_uuid: str, diff --git a/api/services/telephony/ari_manager.py b/api/services/telephony/ari_manager.py index 4c690535..aa71e6fd 100644 --- a/api/services/telephony/ari_manager.py +++ b/api/services/telephony/ari_manager.py @@ -11,19 +11,25 @@ Standalone process that: from api.logging_config import setup_logging setup_logging() - import asyncio import json import signal -from typing import Any, Dict, Optional, Set +from typing import Dict, Optional, Set from urllib.parse import urlparse +import aiohttp +import redis.asyncio as aioredis import websockets from loguru import logger +from api.constants import REDIS_URL from api.db import db_client from api.enums import OrganizationConfigurationKey +# Redis key pattern and TTL for channel-to-run mapping +_CHANNEL_KEY_PREFIX = "ari:channel:" +_CHANNEL_KEY_TTL = 3600 # 1 hour safety expiry + class ARIConnection: """Manages a single ARI WebSocket connection for an organization.""" @@ -34,11 +40,13 @@ class ARIConnection: ari_endpoint: str, app_name: str, app_password: str, + ws_client_name: str = "", ): self.organization_id = organization_id self.ari_endpoint = ari_endpoint.rstrip("/") self.app_name = app_name self.app_password = app_password + self.ws_client_name = ws_client_name self._ws: Optional[websockets.ClientConnection] = None self._task: Optional[asyncio.Task] = None @@ -47,6 +55,39 @@ class ARIConnection: self._max_reconnect_delay = 300 # Max 300 seconds self._ping_interval = 30 # Send ping every 30 seconds + # Redis client for channel-to-run reverse mapping (lazy init) + self._redis_client: Optional[aioredis.Redis] = None + + async def _get_redis(self) -> aioredis.Redis: + """Get Redis client instance (lazy init).""" + if not self._redis_client: + self._redis_client = await aioredis.from_url( + REDIS_URL, decode_responses=True + ) + return self._redis_client + + async def _set_channel_run(self, channel_id: str, workflow_run_id: str): + """Store channel_id -> workflow_run_id mapping in Redis.""" + r = await self._get_redis() + await r.set( + f"{_CHANNEL_KEY_PREFIX}{channel_id}", + workflow_run_id, + ex=_CHANNEL_KEY_TTL, + ) + + async def _get_channel_run(self, channel_id: str) -> Optional[str]: + """Look up workflow_run_id for a channel_id from Redis.""" + r = await self._get_redis() + return await r.get(f"{_CHANNEL_KEY_PREFIX}{channel_id}") + + async def _delete_channel_run(self, *channel_ids: str): + """Delete channel-to-run mapping(s) from Redis.""" + if not channel_ids: + return + r = await self._get_redis() + keys = [f"{_CHANNEL_KEY_PREFIX}{cid}" for cid in channel_ids] + await r.delete(*keys) + @property def ws_url(self) -> str: """Build the ARI WebSocket URL.""" @@ -178,14 +219,42 @@ class ARIConnection: f"caller={caller.get('number', 'unknown')}, " f"args={app_args}" ) - # TODO: This is where we'll integrate with the pipeline - # For now, just log the event + + # Parse args to extract workflow context + args_dict = {} + for arg in app_args: + for pair in arg.split(","): + if "=" in pair: + key, value = pair.split("=", 1) + args_dict[key.strip()] = value.strip() + + workflow_run_id = args_dict.get("workflow_run_id") + workflow_id = args_dict.get("workflow_id") + user_id = args_dict.get("user_id") + + if not workflow_run_id or not workflow_id or not user_id: + logger.warning( + f"[ARI org={self.organization_id}] StasisStart missing required args: " + f"workflow_run_id={workflow_run_id}, workflow_id={workflow_id}, user_id={user_id}" + ) + return + + # Start pipeline connection in background task + asyncio.create_task( + self._handle_stasis_start( + channel_id, channel_state, workflow_run_id, workflow_id, user_id + ) + ) elif event_type == "StasisEnd": logger.info( - f"[ARI org={self.organization_id}] StasisEnd: " - f"channel={channel_id}" + f"[ARI org={self.organization_id}] StasisEnd: channel={channel_id}" ) + workflow_run_id = await self._get_channel_run(channel_id) + if workflow_run_id: + asyncio.create_task( + self._handle_stasis_end(channel_id, workflow_run_id) + ) elif event_type == "ChannelStateChange": logger.debug( @@ -214,6 +283,255 @@ class ARIConnection: f"channel={channel_id}" ) + async def _ari_request(self, method: str, path: str, **kwargs) -> dict: + """Make an ARI REST API request.""" + + url = f"{self.ari_endpoint}/ari{path}" + auth = aiohttp.BasicAuth(self.app_name, self.app_password) + + async with aiohttp.ClientSession() as session: + async with session.request(method, url, auth=auth, **kwargs) as response: + response_text = await response.text() + if response.status not in (200, 201, 204): + logger.error( + f"[ARI org={self.organization_id}] REST API error: " + f"{method} {path} -> {response.status}: {response_text}" + ) + return {} + if response_text: + return json.loads(response_text) + return {} + + async def _answer_channel(self, channel_id: str) -> bool: + """Answer an ARI channel.""" + await self._ari_request("POST", f"/channels/{channel_id}/answer") + # answer returns 204 No Content on success, so empty dict is OK + logger.info(f"[ARI org={self.organization_id}] Answered channel {channel_id}") + return True + + async def _create_external_media( + self, + workflow_id: str, + user_id: str, + workflow_run_id: str, + ) -> str: + """Create an external media channel via chan_websocket. + + Uses ARI externalMedia with transport=websocket so Asterisk connects + to our backend over WebSocket (via websocket_client.conf). + Dynamic routing params are passed as URI query params via v() in transport_data. + """ + # v() appends URI query params to the websocket_client.conf URL + # e.g. wss://api.dograh.com/ws/ari?workflow_id=1&user_id=2&workflow_run_id=3 + transport_data = ( + f"v(workflow_id={workflow_id}," + f"user_id={user_id}," + f"workflow_run_id={workflow_run_id})" + ) + + result = await self._ari_request( + "POST", + "/channels/externalMedia", + params={ + "app": self.app_name, + "external_host": self.ws_client_name, + "format": "ulaw", + "transport": "websocket", + "encapsulation": "none", + "connection_type": "client", + "direction": "both", + "transport_data": transport_data, + }, + ) + ext_channel_id = result.get("id", "") + if ext_channel_id: + logger.info( + f"[ARI org={self.organization_id}] Created external media channel: {ext_channel_id}" + ) + return ext_channel_id + + async def _create_bridge_and_add_channels(self, channel_ids: list) -> str: + """Create a bridge and add channels to it.""" + # Create bridge + bridge_result = await self._ari_request( + "POST", + "/bridges", + params={"type": "mixing", "name": f"bridge-{channel_ids[0]}"}, + ) + bridge_id = bridge_result.get("id", "") + if not bridge_id: + logger.error(f"[ARI org={self.organization_id}] Failed to create bridge") + return "" + + # Add channels to bridge + await self._ari_request( + "POST", + f"/bridges/{bridge_id}/addChannel", + params={"channel": ",".join(channel_ids)}, + ) + logger.info( + f"[ARI org={self.organization_id}] Bridge {bridge_id} created with channels: {channel_ids}" + ) + return bridge_id + + async def _handle_stasis_start( + self, + channel_id: str, + channel_state: str, + workflow_run_id: str, + workflow_id: str, + user_id: str, + ): + """Handle StasisStart by answering (if needed), creating external media, and bridging.""" + try: + # 1. Only answer the channel if it's not already up + # For outbound calls, the channel enters Stasis in "Up" state + # after the remote party answers — no need to answer again. + # For inbound calls, the channel may be in "Ring" state. + if channel_state != "Up": + await self._answer_channel(channel_id) + + logger.info( + f"[ARI org={self.organization_id}] Setting up external media for " + f"channel {channel_id} via ws_client={self.ws_client_name}" + ) + + # 2. Track channel for StasisEnd cleanup (Redis) + await self._set_channel_run(channel_id, workflow_run_id) + + # 3. Create external media channel via chan_websocket + # Asterisk connects to our backend using websocket_client.conf config, + # with routing params appended as URI query params via v() + ext_channel_id = await self._create_external_media( + workflow_id, user_id, workflow_run_id + ) + if not ext_channel_id: + logger.error( + f"[ARI org={self.organization_id}] Failed to create external media for {channel_id}" + ) + return + + # 4. Track ext channel for StasisEnd cleanup (Redis) + await self._set_channel_run(ext_channel_id, workflow_run_id) + + # 5. Bridge the call channel with the external media channel + bridge_id = await self._create_bridge_and_add_channels( + [channel_id, ext_channel_id] + ) + if not bridge_id: + logger.error( + f"[ARI org={self.organization_id}] Failed to bridge channels" + ) + return + + # 6. Store ARI resource IDs in gathered_context for cleanup/debugging + await db_client.update_workflow_run( + run_id=int(workflow_run_id), + gathered_context={ + "ext_channel_id": ext_channel_id, + "bridge_id": bridge_id, + }, + ) + except Exception as e: + logger.error( + f"[ARI org={self.organization_id}] Error handling StasisStart " + f"for channel {channel_id}: {e}" + ) + + async def _handle_stasis_end(self, channel_id: str, workflow_run_id: str): + """Full teardown of all ARI resources on any channel's StasisEnd. + + When either channel (call or ext) fires StasisEnd, we tear down + the bridge and both channels — like endConferenceOnExit. + """ + try: + workflow_run = await db_client.get_workflow_run_by_id(int(workflow_run_id)) + if not workflow_run or not workflow_run.gathered_context: + logger.warning( + f"[ARI org={self.organization_id}] StasisEnd: no gathered_context " + f"for workflow_run {workflow_run_id}" + ) + # Still clean up the Redis key for the channel that ended + await self._delete_channel_run(channel_id) + return + + ctx = workflow_run.gathered_context + call_id = ctx.get("call_id") + ext_channel_id = ctx.get("ext_channel_id") + bridge_id = ctx.get("bridge_id") + + # Delete the bridge first (removes channels from it) + if bridge_id: + await self._delete_bridge(bridge_id) + + # Destroy both channels, skipping the one that already ended + for cid in (call_id, ext_channel_id): + if cid and cid != channel_id: + await self._delete_channel(cid) + + # Clean up all Redis reverse-mapping keys + keys_to_delete = [ + cid for cid in (call_id, ext_channel_id, channel_id) if cid + ] + if keys_to_delete: + await self._delete_channel_run(*keys_to_delete) + + logger.info( + f"[ARI org={self.organization_id}] StasisEnd full teardown for " + f"channel={channel_id}, call={call_id}, ext={ext_channel_id}, bridge={bridge_id}" + ) + except Exception as e: + logger.error( + f"[ARI org={self.organization_id}] Error cleaning up StasisEnd " + f"for channel {channel_id}: {e}" + ) + + async def _delete_bridge(self, bridge_id: str): + """Delete an ARI bridge. Ignores 404 (already gone).""" + + url = f"{self.ari_endpoint}/ari/bridges/{bridge_id}" + auth = aiohttp.BasicAuth(self.app_name, self.app_password) + + async with aiohttp.ClientSession() as session: + async with session.delete(url, auth=auth) as response: + if response.status in (200, 204): + logger.info( + f"[ARI org={self.organization_id}] Deleted bridge {bridge_id}" + ) + elif response.status == 404: + logger.debug( + f"[ARI org={self.organization_id}] Bridge {bridge_id} already gone" + ) + else: + text = await response.text() + logger.error( + f"[ARI org={self.organization_id}] Failed to delete bridge {bridge_id}: " + f"{response.status} {text}" + ) + + async def _delete_channel(self, channel_id: str): + """Delete (hang up) an ARI channel. Ignores 404 (already gone).""" + + url = f"{self.ari_endpoint}/ari/channels/{channel_id}" + auth = aiohttp.BasicAuth(self.app_name, self.app_password) + + async with aiohttp.ClientSession() as session: + async with session.delete(url, auth=auth) as response: + if response.status in (200, 204): + logger.info( + f"[ARI org={self.organization_id}] Deleted channel {channel_id}" + ) + elif response.status == 404: + logger.debug( + f"[ARI org={self.organization_id}] Channel {channel_id} already gone" + ) + else: + text = await response.text() + logger.error( + f"[ARI org={self.organization_id}] Failed to delete channel {channel_id}: " + f"{response.status} {text}" + ) + class ARIManager: """Manages ARI WebSocket connections for all organizations.""" @@ -269,8 +587,11 @@ class ARIManager: ari_endpoint = config["ari_endpoint"] app_name = config["app_name"] app_password = config["app_password"] + ws_client_name = config["ws_client_name"] - conn = ARIConnection(org_id, ari_endpoint, app_name, app_password) + conn = ARIConnection( + org_id, ari_endpoint, app_name, app_password, ws_client_name + ) key = conn.connection_key active_keys.add(key) @@ -324,6 +645,7 @@ class ARIManager: ari_endpoint = value.get("ari_endpoint") app_name = value.get("app_name") app_password = value.get("app_password") + ws_client_name = value.get("ws_client_name", "") if not all([ari_endpoint, app_name, app_password]): logger.warning( @@ -331,12 +653,19 @@ class ARIManager: ) continue + if not ws_client_name: + logger.warning( + f"[ARI Manager] Missing ws_client_name for org {org_id}, " + f"externalMedia WebSocket won't work" + ) + configs.append( { "organization_id": org_id, "ari_endpoint": ari_endpoint, "app_name": app_name, "app_password": app_password, + "ws_client_name": ws_client_name, } ) diff --git a/api/services/telephony/factory.py b/api/services/telephony/factory.py index 2829169d..ac03d6fb 100644 --- a/api/services/telephony/factory.py +++ b/api/services/telephony/factory.py @@ -139,4 +139,10 @@ async def get_all_telephony_providers() -> List[Type[TelephonyProvider]]: Returns: List of provider classes that can be used for webhook detection """ - return [ARIProvider, CloudonixProvider, TwilioProvider, VobizProvider, VonageProvider] + return [ + ARIProvider, + CloudonixProvider, + TwilioProvider, + VobizProvider, + VonageProvider, + ] diff --git a/api/services/telephony/providers/ari_provider.py b/api/services/telephony/providers/ari_provider.py index 28a90c68..67b449c4 100644 --- a/api/services/telephony/providers/ari_provider.py +++ b/api/services/telephony/providers/ari_provider.py @@ -13,6 +13,7 @@ import aiohttp from fastapi import HTTPException from loguru import logger +from api.db import db_client from api.enums import WorkflowRunMode from api.services.telephony.base import ( CallInitiationResult, @@ -92,25 +93,21 @@ class ARIProvider(TelephonyProvider): params = { "endpoint": sip_endpoint, "app": self.app_name, - "appArgs": f"workflow_run_id={workflow_run_id}" if workflow_run_id else "", + "appArgs": ",".join( + filter( + None, + [ + f"workflow_run_id={workflow_run_id}", + f"workflow_id={kwargs.get('workflow_id', '')}", + f"user_id={kwargs.get('user_id', '')}", + ], + ) + ), } if from_number: params["callerId"] = from_number - # Add variables for tracking - variables = {} - if workflow_run_id: - variables["WORKFLOW_RUN_ID"] = str(workflow_run_id) - if kwargs.get("workflow_id"): - variables["WORKFLOW_ID"] = str(kwargs["workflow_id"]) - if kwargs.get("user_id"): - variables["USER_ID"] = str(kwargs["user_id"]) - - data = {} - if variables: - data["variables"] = variables - logger.info( f"[ARI] Initiating call to {sip_endpoint} " f"via app={self.app_name}, workflow_run_id={workflow_run_id}" @@ -120,7 +117,6 @@ class ARIProvider(TelephonyProvider): async with session.post( endpoint, params=params, - json=data if data else None, auth=self._get_auth(), ) as response: response_text = await response.text() @@ -248,17 +244,25 @@ class ARIProvider(TelephonyProvider): workflow_run_id: int, ) -> None: """ - ARI WebSocket handling is done by the ari_manager process. - This method is a placeholder for the base class requirement. + Handle WebSocket connection from ARI externalMedia channel. - TODO: Implement pipeline integration when ready. + Unlike Twilio (which sends "connected" and "start" JSON messages), + Asterisk chan_websocket starts streaming audio immediately. """ - logger.warning( - f"handle_websocket called for ARI provider - " - f"pipeline integration not yet implemented for workflow_run {workflow_run_id}" + from api.services.pipecat.run_pipeline import run_pipeline_ari + + # Get channel_id from workflow run context + workflow_run = await db_client.get_workflow_run(workflow_run_id, user_id) + channel_id = "" + if workflow_run and workflow_run.gathered_context: + channel_id = workflow_run.gathered_context.get("call_id", "") + + logger.info( + f"[ARI] Starting pipeline for workflow_run {workflow_run_id}, channel={channel_id}" ) - await websocket.close( - code=4501, reason="ARI pipeline integration not yet implemented" + + await run_pipeline_ari( + websocket, channel_id, workflow_id, workflow_run_id, user_id ) # ======== INBOUND CALL METHODS ======== @@ -329,6 +333,7 @@ class ARIProvider(TelephonyProvider): def generate_validation_error_response(error_type) -> tuple: """Generate JSON error response for validation failures.""" from fastapi import Response + from api.errors.telephony_errors import TELEPHONY_ERROR_MESSAGES, TelephonyError message = TELEPHONY_ERROR_MESSAGES.get( @@ -388,9 +393,7 @@ class ARIProvider(TelephonyProvider): try: async with aiohttp.ClientSession() as session: - async with session.post( - endpoint, auth=self._get_auth() - ) as response: + async with session.post(endpoint, auth=self._get_auth()) as response: if response.status in (200, 204): logger.info(f"[ARI] Channel {channel_id} answered") return True diff --git a/ui/src/app/telephony-configurations/page.tsx b/ui/src/app/telephony-configurations/page.tsx index 77b94a1f..a02ca978 100644 --- a/ui/src/app/telephony-configurations/page.tsx +++ b/ui/src/app/telephony-configurations/page.tsx @@ -57,6 +57,7 @@ interface TelephonyConfigForm { ari_endpoint?: string; app_name?: string; app_password?: string; + ws_client_name?: string; // Common field - multiple phone numbers from_numbers: string[]; } @@ -153,6 +154,7 @@ export default function ConfigureTelephonyPage() { setValue("ari_endpoint", ariConfig.ari_endpoint); setValue("app_name", ariConfig.app_name); setValue("app_password", ariConfig.app_password); + setValue("ws_client_name", ariConfig.ws_client_name); setValue("from_numbers", ariConfig.from_numbers?.length > 0 ? ariConfig.from_numbers : [""]); } } @@ -254,6 +256,7 @@ export default function ConfigureTelephonyPage() { ari_endpoint: data.ari_endpoint!, app_name: data.app_name!, app_password: data.app_password!, + ws_client_name: data.ws_client_name || "", } as AriConfigurationRequest; } @@ -898,6 +901,18 @@ export default function ConfigureTelephonyPage() { )} +
+ + +

+ Connection name from Asterisk's websocket_client.conf for external media streaming +

+
+
{fromNumbers.map((number, index) => ( diff --git a/ui/src/app/workflow/[workflowId]/components/PhoneCallDialog.tsx b/ui/src/app/workflow/[workflowId]/components/PhoneCallDialog.tsx index 9d5ad7df..e7632c60 100644 --- a/ui/src/app/workflow/[workflowId]/components/PhoneCallDialog.tsx +++ b/ui/src/app/workflow/[workflowId]/components/PhoneCallDialog.tsx @@ -238,9 +238,14 @@ export const PhoneCallDialog = ({ {callLoading ? "Calling..." : "Start Call"} ) : ( - + <> + + + )}
diff --git a/ui/src/client/types.gen.ts b/ui/src/client/types.gen.ts index e5be62f4..fb93eb7d 100644 --- a/ui/src/client/types.gen.ts +++ b/ui/src/client/types.gen.ts @@ -36,6 +36,10 @@ export type AriConfigurationRequest = { * ARI user password */ app_password: string; + /** + * websocket_client.conf connection name for externalMedia (e.g., dograh_staging) + */ + ws_client_name?: string; /** * List of SIP extensions/numbers for outbound calls (optional) */ @@ -50,6 +54,7 @@ export type AriConfigurationResponse = { ari_endpoint: string; app_name: string; app_password: string; + ws_client_name?: string; from_numbers: Array; };