feat: add websocket configuration for ARI

This commit is contained in:
Abhishek Kumar 2026-02-16 11:56:52 +05:30
parent e0f43ccf27
commit 1821872f7a
13 changed files with 607 additions and 49 deletions

View file

@ -5,15 +5,15 @@ Revises: 1a7d74d54e8f
Create Date: 2026-02-15 13:52:29.285583
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from alembic_postgresql_enum import TableReference
# revision identifiers, used by Alembic.
revision: str = '6d2f94baf4b7'
down_revision: Union[str, None] = '1a7d74d54e8f'
revision: str = "6d2f94baf4b7"
down_revision: Union[str, None] = "1a7d74d54e8f"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
@ -21,10 +21,25 @@ depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.sync_enum_values(
enum_schema='public',
enum_name='workflow_run_mode',
new_values=['ari', 'twilio', 'vonage', 'vobiz', 'cloudonix', 'webrtc', 'smallwebrtc', 'stasis', 'VOICE', 'CHAT'],
affected_columns=[TableReference(table_schema='public', table_name='workflow_runs', column_name='mode')],
enum_schema="public",
enum_name="workflow_run_mode",
new_values=[
"ari",
"twilio",
"vonage",
"vobiz",
"cloudonix",
"webrtc",
"smallwebrtc",
"stasis",
"VOICE",
"CHAT",
],
affected_columns=[
TableReference(
table_schema="public", table_name="workflow_runs", column_name="mode"
)
],
enum_values_to_rename=[],
)
# ### end Alembic commands ###
@ -33,10 +48,24 @@ def upgrade() -> None:
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.sync_enum_values(
enum_schema='public',
enum_name='workflow_run_mode',
new_values=['twilio', 'vonage', 'vobiz', 'cloudonix', 'stasis', 'webrtc', 'smallwebrtc', 'VOICE', 'CHAT'],
affected_columns=[TableReference(table_schema='public', table_name='workflow_runs', column_name='mode')],
enum_schema="public",
enum_name="workflow_run_mode",
new_values=[
"twilio",
"vonage",
"vobiz",
"cloudonix",
"stasis",
"webrtc",
"smallwebrtc",
"VOICE",
"CHAT",
],
affected_columns=[
TableReference(
table_schema="public", table_name="workflow_runs", column_name="mode"
)
],
enum_values_to_rename=[],
)
# ### end Alembic commands ###

View file

@ -132,6 +132,7 @@ async def get_telephony_configuration(user: UserModel = Depends(get_user)):
ari_endpoint = config.value.get("ari_endpoint", "")
app_name = config.value.get("app_name", "")
app_password = config.value.get("app_password", "")
ws_client_name = config.value.get("ws_client_name", "")
from_numbers = config.value.get("from_numbers", [])
return TelephonyConfigurationResponse(
@ -140,6 +141,7 @@ async def get_telephony_configuration(user: UserModel = Depends(get_user)):
ari_endpoint=ari_endpoint,
app_name=app_name,
app_password=mask_key(app_password) if app_password else "",
ws_client_name=ws_client_name,
from_numbers=from_numbers,
),
)
@ -205,6 +207,7 @@ async def save_telephony_configuration(
"ari_endpoint": request.ari_endpoint,
"app_name": request.app_name,
"app_password": request.app_password,
"ws_client_name": request.ws_client_name,
"from_numbers": request.from_numbers,
}
else:

View file

@ -523,13 +523,47 @@ async def handle_ncco_webhook(
return json.loads(response_content)
@router.websocket("/ws/ari")
async def websocket_ari_endpoint(websocket: WebSocket):
"""WebSocket endpoint for ARI chan_websocket external media.
Asterisk connects here via chan_websocket. Routing params are passed as
query params (appended by the v() dial string option in externalMedia).
"""
workflow_id = websocket.query_params.get("workflow_id")
user_id = websocket.query_params.get("user_id")
workflow_run_id = websocket.query_params.get("workflow_run_id")
if not workflow_id or not user_id or not workflow_run_id:
logger.error(
f"ARI WebSocket missing query params: "
f"workflow_id={workflow_id}, user_id={user_id}, workflow_run_id={workflow_run_id}"
)
await websocket.close(code=4400, reason="Missing required query params")
return
# Accept with "media" subprotocol — chan_websocket sends
# Sec-WebSocket-Protocol: media and requires it echoed back.
await websocket.accept(subprotocol="media")
await _handle_telephony_websocket(
websocket, int(workflow_id), int(user_id), int(workflow_run_id)
)
@router.websocket("/ws/{workflow_id}/{user_id}/{workflow_run_id}")
async def websocket_endpoint(
websocket: WebSocket, workflow_id: int, user_id: int, workflow_run_id: int
):
"""WebSocket endpoint for real-time call handling - routes to provider-specific handlers."""
await websocket.accept()
await _handle_telephony_websocket(websocket, workflow_id, user_id, workflow_run_id)
async def _handle_telephony_websocket(
websocket: WebSocket, workflow_id: int, user_id: int, workflow_run_id: int
):
"""Shared WebSocket handler logic (connection already accepted)."""
try:
# Set the run context
set_current_run_id(workflow_run_id)

View file

@ -100,6 +100,10 @@ class ARIConfigurationRequest(BaseModel):
..., description="Stasis application name registered in Asterisk"
)
app_password: str = Field(..., description="ARI user password")
ws_client_name: str = Field(
default="",
description="websocket_client.conf connection name for externalMedia (e.g., dograh_staging)",
)
from_numbers: List[str] = Field(
default_factory=list,
description="List of SIP extensions/numbers for outbound calls (optional)",
@ -113,6 +117,7 @@ class ARIConfigurationResponse(BaseModel):
ari_endpoint: str
app_name: str
app_password: str # Masked
ws_client_name: str = ""
from_numbers: List[str]

View file

@ -96,8 +96,9 @@ def create_audio_config(transport_type: str) -> AudioConfig:
WorkflowRunMode.TWILIO.value,
WorkflowRunMode.VOBIZ.value,
WorkflowRunMode.CLOUDONIX.value,
WorkflowRunMode.ARI.value,
):
# Twilio, Cloudonix, and Vobiz use MULAW at 8kHz
# Twilio, Cloudonix, Vobiz, and ARI use MULAW at 8kHz
return AudioConfig(
transport_in_sample_rate=8000,
transport_out_sample_rate=8000,

View file

@ -32,6 +32,7 @@ from api.services.pipecat.service_factory import (
)
from api.services.pipecat.tracing_config import setup_pipeline_tracing
from api.services.pipecat.transport_setup import (
create_ari_transport,
create_cloudonix_transport,
create_twilio_transport,
create_vobiz_transport,
@ -197,6 +198,63 @@ async def run_pipeline_vonage(
raise
async def run_pipeline_ari(
websocket_client: WebSocket,
channel_id: str,
workflow_id: int,
workflow_run_id: int,
user_id: int,
) -> None:
"""Run pipeline for Asterisk ARI WebSocket connections.
ARI uses raw 16-bit signed linear PCM (SLIN16) at 16kHz
transmitted as binary WebSocket frames via chan_websocket.
"""
logger.info(f"Starting ARI pipeline for workflow run {workflow_run_id}")
set_current_run_id(workflow_run_id)
# Store call ID (channel_id) in cost_info
cost_info = {"call_id": channel_id}
await db_client.update_workflow_run(workflow_run_id, cost_info=cost_info)
# Get workflow to extract configurations
workflow = await db_client.get_workflow(workflow_id, user_id)
vad_config = None
ambient_noise_config = None
if workflow and workflow.workflow_configurations:
if "vad_configuration" in workflow.workflow_configurations:
vad_config = workflow.workflow_configurations["vad_configuration"]
if "ambient_noise_configuration" in workflow.workflow_configurations:
ambient_noise_config = workflow.workflow_configurations[
"ambient_noise_configuration"
]
try:
audio_config = create_audio_config(WorkflowRunMode.ARI.value)
transport = await create_ari_transport(
websocket_client,
channel_id,
workflow_run_id,
audio_config,
workflow.organization_id,
vad_config,
ambient_noise_config,
)
await _run_pipeline(
transport,
workflow_id,
workflow_run_id,
user_id,
audio_config=audio_config,
)
except Exception as e:
logger.error(f"Error in ARI pipeline: {e}")
raise
async def run_pipeline_vobiz(
websocket_client: WebSocket,
stream_id: str,

View file

@ -8,6 +8,7 @@ from api.enums import OrganizationConfigurationKey
from api.services.pipecat.audio_config import AudioConfig
from pipecat.audio.mixers.silence_mixer import SilenceAudioMixer
from pipecat.audio.mixers.soundfile_mixer import SoundfileMixer
from pipecat.serializers.asterisk import AsteriskFrameSerializer
from pipecat.serializers.twilio import TwilioFrameSerializer
from pipecat.serializers.vobiz import VobizFrameSerializer
from pipecat.serializers.vonage import VonageFrameSerializer
@ -149,6 +150,70 @@ async def create_cloudonix_transport(
)
async def create_ari_transport(
websocket_client: WebSocket,
channel_id: str,
workflow_run_id: int,
audio_config: AudioConfig,
organization_id: int,
vad_config: dict | None = None,
ambient_noise_config: dict | None = None,
):
"""Create a transport for Asterisk ARI connections"""
from api.services.telephony.factory import load_telephony_config
config = await load_telephony_config(organization_id)
if config.get("provider") != "ari":
raise ValueError(f"Expected ARI provider, got {config.get('provider')}")
ari_endpoint = config.get("ari_endpoint")
app_name = config.get("app_name")
app_password = config.get("app_password")
if not ari_endpoint or not app_name or not app_password:
raise ValueError(
f"Incomplete ARI configuration for organization {organization_id}. "
f"Required: ari_endpoint, app_name, app_password"
)
serializer = AsteriskFrameSerializer(
channel_id=channel_id,
ari_endpoint=ari_endpoint,
app_name=app_name,
app_password=app_password,
params=AsteriskFrameSerializer.InputParams(
asterisk_sample_rate=audio_config.transport_in_sample_rate,
sample_rate=audio_config.pipeline_sample_rate,
),
)
return FastAPIWebsocketTransport(
websocket=websocket_client,
params=FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
audio_in_sample_rate=audio_config.transport_in_sample_rate,
audio_out_sample_rate=audio_config.transport_out_sample_rate,
audio_out_mixer=(
SoundfileMixer(
sound_files={
"office": APP_ROOT_DIR
/ "assets"
/ f"office-ambience-{audio_config.transport_out_sample_rate}-mono.wav"
},
default_sound="office",
volume=ambient_noise_config.get("volume", 0.3),
)
if ambient_noise_config and ambient_noise_config.get("enabled", False)
else SilenceAudioMixer()
),
serializer=serializer,
),
)
async def create_vonage_transport(
websocket_client,
call_uuid: str,

View file

@ -11,19 +11,25 @@ Standalone process that:
from api.logging_config import setup_logging
setup_logging()
import asyncio
import json
import signal
from typing import Any, Dict, Optional, Set
from typing import Dict, Optional, Set
from urllib.parse import urlparse
import aiohttp
import redis.asyncio as aioredis
import websockets
from loguru import logger
from api.constants import REDIS_URL
from api.db import db_client
from api.enums import OrganizationConfigurationKey
# Redis key pattern and TTL for channel-to-run mapping
_CHANNEL_KEY_PREFIX = "ari:channel:"
_CHANNEL_KEY_TTL = 3600 # 1 hour safety expiry
class ARIConnection:
"""Manages a single ARI WebSocket connection for an organization."""
@ -34,11 +40,13 @@ class ARIConnection:
ari_endpoint: str,
app_name: str,
app_password: str,
ws_client_name: str = "",
):
self.organization_id = organization_id
self.ari_endpoint = ari_endpoint.rstrip("/")
self.app_name = app_name
self.app_password = app_password
self.ws_client_name = ws_client_name
self._ws: Optional[websockets.ClientConnection] = None
self._task: Optional[asyncio.Task] = None
@ -47,6 +55,39 @@ class ARIConnection:
self._max_reconnect_delay = 300 # Max 300 seconds
self._ping_interval = 30 # Send ping every 30 seconds
# Redis client for channel-to-run reverse mapping (lazy init)
self._redis_client: Optional[aioredis.Redis] = None
async def _get_redis(self) -> aioredis.Redis:
"""Get Redis client instance (lazy init)."""
if not self._redis_client:
self._redis_client = await aioredis.from_url(
REDIS_URL, decode_responses=True
)
return self._redis_client
async def _set_channel_run(self, channel_id: str, workflow_run_id: str):
"""Store channel_id -> workflow_run_id mapping in Redis."""
r = await self._get_redis()
await r.set(
f"{_CHANNEL_KEY_PREFIX}{channel_id}",
workflow_run_id,
ex=_CHANNEL_KEY_TTL,
)
async def _get_channel_run(self, channel_id: str) -> Optional[str]:
"""Look up workflow_run_id for a channel_id from Redis."""
r = await self._get_redis()
return await r.get(f"{_CHANNEL_KEY_PREFIX}{channel_id}")
async def _delete_channel_run(self, *channel_ids: str):
"""Delete channel-to-run mapping(s) from Redis."""
if not channel_ids:
return
r = await self._get_redis()
keys = [f"{_CHANNEL_KEY_PREFIX}{cid}" for cid in channel_ids]
await r.delete(*keys)
@property
def ws_url(self) -> str:
"""Build the ARI WebSocket URL."""
@ -178,14 +219,42 @@ class ARIConnection:
f"caller={caller.get('number', 'unknown')}, "
f"args={app_args}"
)
# TODO: This is where we'll integrate with the pipeline
# For now, just log the event
# Parse args to extract workflow context
args_dict = {}
for arg in app_args:
for pair in arg.split(","):
if "=" in pair:
key, value = pair.split("=", 1)
args_dict[key.strip()] = value.strip()
workflow_run_id = args_dict.get("workflow_run_id")
workflow_id = args_dict.get("workflow_id")
user_id = args_dict.get("user_id")
if not workflow_run_id or not workflow_id or not user_id:
logger.warning(
f"[ARI org={self.organization_id}] StasisStart missing required args: "
f"workflow_run_id={workflow_run_id}, workflow_id={workflow_id}, user_id={user_id}"
)
return
# Start pipeline connection in background task
asyncio.create_task(
self._handle_stasis_start(
channel_id, channel_state, workflow_run_id, workflow_id, user_id
)
)
elif event_type == "StasisEnd":
logger.info(
f"[ARI org={self.organization_id}] StasisEnd: "
f"channel={channel_id}"
f"[ARI org={self.organization_id}] StasisEnd: channel={channel_id}"
)
workflow_run_id = await self._get_channel_run(channel_id)
if workflow_run_id:
asyncio.create_task(
self._handle_stasis_end(channel_id, workflow_run_id)
)
elif event_type == "ChannelStateChange":
logger.debug(
@ -214,6 +283,255 @@ class ARIConnection:
f"channel={channel_id}"
)
async def _ari_request(self, method: str, path: str, **kwargs) -> dict:
"""Make an ARI REST API request."""
url = f"{self.ari_endpoint}/ari{path}"
auth = aiohttp.BasicAuth(self.app_name, self.app_password)
async with aiohttp.ClientSession() as session:
async with session.request(method, url, auth=auth, **kwargs) as response:
response_text = await response.text()
if response.status not in (200, 201, 204):
logger.error(
f"[ARI org={self.organization_id}] REST API error: "
f"{method} {path} -> {response.status}: {response_text}"
)
return {}
if response_text:
return json.loads(response_text)
return {}
async def _answer_channel(self, channel_id: str) -> bool:
"""Answer an ARI channel."""
await self._ari_request("POST", f"/channels/{channel_id}/answer")
# answer returns 204 No Content on success, so empty dict is OK
logger.info(f"[ARI org={self.organization_id}] Answered channel {channel_id}")
return True
async def _create_external_media(
self,
workflow_id: str,
user_id: str,
workflow_run_id: str,
) -> str:
"""Create an external media channel via chan_websocket.
Uses ARI externalMedia with transport=websocket so Asterisk connects
to our backend over WebSocket (via websocket_client.conf).
Dynamic routing params are passed as URI query params via v() in transport_data.
"""
# v() appends URI query params to the websocket_client.conf URL
# e.g. wss://api.dograh.com/ws/ari?workflow_id=1&user_id=2&workflow_run_id=3
transport_data = (
f"v(workflow_id={workflow_id},"
f"user_id={user_id},"
f"workflow_run_id={workflow_run_id})"
)
result = await self._ari_request(
"POST",
"/channels/externalMedia",
params={
"app": self.app_name,
"external_host": self.ws_client_name,
"format": "ulaw",
"transport": "websocket",
"encapsulation": "none",
"connection_type": "client",
"direction": "both",
"transport_data": transport_data,
},
)
ext_channel_id = result.get("id", "")
if ext_channel_id:
logger.info(
f"[ARI org={self.organization_id}] Created external media channel: {ext_channel_id}"
)
return ext_channel_id
async def _create_bridge_and_add_channels(self, channel_ids: list) -> str:
"""Create a bridge and add channels to it."""
# Create bridge
bridge_result = await self._ari_request(
"POST",
"/bridges",
params={"type": "mixing", "name": f"bridge-{channel_ids[0]}"},
)
bridge_id = bridge_result.get("id", "")
if not bridge_id:
logger.error(f"[ARI org={self.organization_id}] Failed to create bridge")
return ""
# Add channels to bridge
await self._ari_request(
"POST",
f"/bridges/{bridge_id}/addChannel",
params={"channel": ",".join(channel_ids)},
)
logger.info(
f"[ARI org={self.organization_id}] Bridge {bridge_id} created with channels: {channel_ids}"
)
return bridge_id
async def _handle_stasis_start(
self,
channel_id: str,
channel_state: str,
workflow_run_id: str,
workflow_id: str,
user_id: str,
):
"""Handle StasisStart by answering (if needed), creating external media, and bridging."""
try:
# 1. Only answer the channel if it's not already up
# For outbound calls, the channel enters Stasis in "Up" state
# after the remote party answers — no need to answer again.
# For inbound calls, the channel may be in "Ring" state.
if channel_state != "Up":
await self._answer_channel(channel_id)
logger.info(
f"[ARI org={self.organization_id}] Setting up external media for "
f"channel {channel_id} via ws_client={self.ws_client_name}"
)
# 2. Track channel for StasisEnd cleanup (Redis)
await self._set_channel_run(channel_id, workflow_run_id)
# 3. Create external media channel via chan_websocket
# Asterisk connects to our backend using websocket_client.conf config,
# with routing params appended as URI query params via v()
ext_channel_id = await self._create_external_media(
workflow_id, user_id, workflow_run_id
)
if not ext_channel_id:
logger.error(
f"[ARI org={self.organization_id}] Failed to create external media for {channel_id}"
)
return
# 4. Track ext channel for StasisEnd cleanup (Redis)
await self._set_channel_run(ext_channel_id, workflow_run_id)
# 5. Bridge the call channel with the external media channel
bridge_id = await self._create_bridge_and_add_channels(
[channel_id, ext_channel_id]
)
if not bridge_id:
logger.error(
f"[ARI org={self.organization_id}] Failed to bridge channels"
)
return
# 6. Store ARI resource IDs in gathered_context for cleanup/debugging
await db_client.update_workflow_run(
run_id=int(workflow_run_id),
gathered_context={
"ext_channel_id": ext_channel_id,
"bridge_id": bridge_id,
},
)
except Exception as e:
logger.error(
f"[ARI org={self.organization_id}] Error handling StasisStart "
f"for channel {channel_id}: {e}"
)
async def _handle_stasis_end(self, channel_id: str, workflow_run_id: str):
"""Full teardown of all ARI resources on any channel's StasisEnd.
When either channel (call or ext) fires StasisEnd, we tear down
the bridge and both channels like endConferenceOnExit.
"""
try:
workflow_run = await db_client.get_workflow_run_by_id(int(workflow_run_id))
if not workflow_run or not workflow_run.gathered_context:
logger.warning(
f"[ARI org={self.organization_id}] StasisEnd: no gathered_context "
f"for workflow_run {workflow_run_id}"
)
# Still clean up the Redis key for the channel that ended
await self._delete_channel_run(channel_id)
return
ctx = workflow_run.gathered_context
call_id = ctx.get("call_id")
ext_channel_id = ctx.get("ext_channel_id")
bridge_id = ctx.get("bridge_id")
# Delete the bridge first (removes channels from it)
if bridge_id:
await self._delete_bridge(bridge_id)
# Destroy both channels, skipping the one that already ended
for cid in (call_id, ext_channel_id):
if cid and cid != channel_id:
await self._delete_channel(cid)
# Clean up all Redis reverse-mapping keys
keys_to_delete = [
cid for cid in (call_id, ext_channel_id, channel_id) if cid
]
if keys_to_delete:
await self._delete_channel_run(*keys_to_delete)
logger.info(
f"[ARI org={self.organization_id}] StasisEnd full teardown for "
f"channel={channel_id}, call={call_id}, ext={ext_channel_id}, bridge={bridge_id}"
)
except Exception as e:
logger.error(
f"[ARI org={self.organization_id}] Error cleaning up StasisEnd "
f"for channel {channel_id}: {e}"
)
async def _delete_bridge(self, bridge_id: str):
"""Delete an ARI bridge. Ignores 404 (already gone)."""
url = f"{self.ari_endpoint}/ari/bridges/{bridge_id}"
auth = aiohttp.BasicAuth(self.app_name, self.app_password)
async with aiohttp.ClientSession() as session:
async with session.delete(url, auth=auth) as response:
if response.status in (200, 204):
logger.info(
f"[ARI org={self.organization_id}] Deleted bridge {bridge_id}"
)
elif response.status == 404:
logger.debug(
f"[ARI org={self.organization_id}] Bridge {bridge_id} already gone"
)
else:
text = await response.text()
logger.error(
f"[ARI org={self.organization_id}] Failed to delete bridge {bridge_id}: "
f"{response.status} {text}"
)
async def _delete_channel(self, channel_id: str):
"""Delete (hang up) an ARI channel. Ignores 404 (already gone)."""
url = f"{self.ari_endpoint}/ari/channels/{channel_id}"
auth = aiohttp.BasicAuth(self.app_name, self.app_password)
async with aiohttp.ClientSession() as session:
async with session.delete(url, auth=auth) as response:
if response.status in (200, 204):
logger.info(
f"[ARI org={self.organization_id}] Deleted channel {channel_id}"
)
elif response.status == 404:
logger.debug(
f"[ARI org={self.organization_id}] Channel {channel_id} already gone"
)
else:
text = await response.text()
logger.error(
f"[ARI org={self.organization_id}] Failed to delete channel {channel_id}: "
f"{response.status} {text}"
)
class ARIManager:
"""Manages ARI WebSocket connections for all organizations."""
@ -269,8 +587,11 @@ class ARIManager:
ari_endpoint = config["ari_endpoint"]
app_name = config["app_name"]
app_password = config["app_password"]
ws_client_name = config["ws_client_name"]
conn = ARIConnection(org_id, ari_endpoint, app_name, app_password)
conn = ARIConnection(
org_id, ari_endpoint, app_name, app_password, ws_client_name
)
key = conn.connection_key
active_keys.add(key)
@ -324,6 +645,7 @@ class ARIManager:
ari_endpoint = value.get("ari_endpoint")
app_name = value.get("app_name")
app_password = value.get("app_password")
ws_client_name = value.get("ws_client_name", "")
if not all([ari_endpoint, app_name, app_password]):
logger.warning(
@ -331,12 +653,19 @@ class ARIManager:
)
continue
if not ws_client_name:
logger.warning(
f"[ARI Manager] Missing ws_client_name for org {org_id}, "
f"externalMedia WebSocket won't work"
)
configs.append(
{
"organization_id": org_id,
"ari_endpoint": ari_endpoint,
"app_name": app_name,
"app_password": app_password,
"ws_client_name": ws_client_name,
}
)

View file

@ -139,4 +139,10 @@ async def get_all_telephony_providers() -> List[Type[TelephonyProvider]]:
Returns:
List of provider classes that can be used for webhook detection
"""
return [ARIProvider, CloudonixProvider, TwilioProvider, VobizProvider, VonageProvider]
return [
ARIProvider,
CloudonixProvider,
TwilioProvider,
VobizProvider,
VonageProvider,
]

View file

@ -13,6 +13,7 @@ import aiohttp
from fastapi import HTTPException
from loguru import logger
from api.db import db_client
from api.enums import WorkflowRunMode
from api.services.telephony.base import (
CallInitiationResult,
@ -92,25 +93,21 @@ class ARIProvider(TelephonyProvider):
params = {
"endpoint": sip_endpoint,
"app": self.app_name,
"appArgs": f"workflow_run_id={workflow_run_id}" if workflow_run_id else "",
"appArgs": ",".join(
filter(
None,
[
f"workflow_run_id={workflow_run_id}",
f"workflow_id={kwargs.get('workflow_id', '')}",
f"user_id={kwargs.get('user_id', '')}",
],
)
),
}
if from_number:
params["callerId"] = from_number
# Add variables for tracking
variables = {}
if workflow_run_id:
variables["WORKFLOW_RUN_ID"] = str(workflow_run_id)
if kwargs.get("workflow_id"):
variables["WORKFLOW_ID"] = str(kwargs["workflow_id"])
if kwargs.get("user_id"):
variables["USER_ID"] = str(kwargs["user_id"])
data = {}
if variables:
data["variables"] = variables
logger.info(
f"[ARI] Initiating call to {sip_endpoint} "
f"via app={self.app_name}, workflow_run_id={workflow_run_id}"
@ -120,7 +117,6 @@ class ARIProvider(TelephonyProvider):
async with session.post(
endpoint,
params=params,
json=data if data else None,
auth=self._get_auth(),
) as response:
response_text = await response.text()
@ -248,17 +244,25 @@ class ARIProvider(TelephonyProvider):
workflow_run_id: int,
) -> None:
"""
ARI WebSocket handling is done by the ari_manager process.
This method is a placeholder for the base class requirement.
Handle WebSocket connection from ARI externalMedia channel.
TODO: Implement pipeline integration when ready.
Unlike Twilio (which sends "connected" and "start" JSON messages),
Asterisk chan_websocket starts streaming audio immediately.
"""
logger.warning(
f"handle_websocket called for ARI provider - "
f"pipeline integration not yet implemented for workflow_run {workflow_run_id}"
from api.services.pipecat.run_pipeline import run_pipeline_ari
# Get channel_id from workflow run context
workflow_run = await db_client.get_workflow_run(workflow_run_id, user_id)
channel_id = ""
if workflow_run and workflow_run.gathered_context:
channel_id = workflow_run.gathered_context.get("call_id", "")
logger.info(
f"[ARI] Starting pipeline for workflow_run {workflow_run_id}, channel={channel_id}"
)
await websocket.close(
code=4501, reason="ARI pipeline integration not yet implemented"
await run_pipeline_ari(
websocket, channel_id, workflow_id, workflow_run_id, user_id
)
# ======== INBOUND CALL METHODS ========
@ -329,6 +333,7 @@ class ARIProvider(TelephonyProvider):
def generate_validation_error_response(error_type) -> tuple:
"""Generate JSON error response for validation failures."""
from fastapi import Response
from api.errors.telephony_errors import TELEPHONY_ERROR_MESSAGES, TelephonyError
message = TELEPHONY_ERROR_MESSAGES.get(
@ -388,9 +393,7 @@ class ARIProvider(TelephonyProvider):
try:
async with aiohttp.ClientSession() as session:
async with session.post(
endpoint, auth=self._get_auth()
) as response:
async with session.post(endpoint, auth=self._get_auth()) as response:
if response.status in (200, 204):
logger.info(f"[ARI] Channel {channel_id} answered")
return True

View file

@ -57,6 +57,7 @@ interface TelephonyConfigForm {
ari_endpoint?: string;
app_name?: string;
app_password?: string;
ws_client_name?: string;
// Common field - multiple phone numbers
from_numbers: string[];
}
@ -153,6 +154,7 @@ export default function ConfigureTelephonyPage() {
setValue("ari_endpoint", ariConfig.ari_endpoint);
setValue("app_name", ariConfig.app_name);
setValue("app_password", ariConfig.app_password);
setValue("ws_client_name", ariConfig.ws_client_name);
setValue("from_numbers", ariConfig.from_numbers?.length > 0 ? ariConfig.from_numbers : [""]);
}
}
@ -254,6 +256,7 @@ export default function ConfigureTelephonyPage() {
ari_endpoint: data.ari_endpoint!,
app_name: data.app_name!,
app_password: data.app_password!,
ws_client_name: data.ws_client_name || "",
} as AriConfigurationRequest;
}
@ -898,6 +901,18 @@ export default function ConfigureTelephonyPage() {
)}
</div>
<div className="space-y-2">
<Label htmlFor="ws_client_name">WebSocket Client Name</Label>
<Input
id="ws_client_name"
placeholder="dograh_staging"
{...register("ws_client_name")}
/>
<p className="text-xs text-muted-foreground">
Connection name from Asterisk&apos;s websocket_client.conf for external media streaming
</p>
</div>
<div className="space-y-2">
<Label>SIP Extensions / Numbers (Optional)</Label>
{fromNumbers.map((number, index) => (

View file

@ -238,9 +238,14 @@ export const PhoneCallDialog = ({
{callLoading ? "Calling..." : "Start Call"}
</Button>
) : (
<Button onClick={() => onOpenChange(false)}>
Close
</Button>
<>
<Button variant="outline" onClick={() => { setCallSuccessMsg(null); setCallError(null); }}>
Call Again
</Button>
<Button onClick={() => onOpenChange(false)}>
Close
</Button>
</>
)}
</div>
</DialogFooter>

View file

@ -36,6 +36,10 @@ export type AriConfigurationRequest = {
* ARI user password
*/
app_password: string;
/**
* websocket_client.conf connection name for externalMedia (e.g., dograh_staging)
*/
ws_client_name?: string;
/**
* List of SIP extensions/numbers for outbound calls (optional)
*/
@ -50,6 +54,7 @@ export type AriConfigurationResponse = {
ari_endpoint: string;
app_name: string;
app_password: string;
ws_client_name?: string;
from_numbers: Array<string>;
};