feat: add asterisk ARI websocket interface (#159)

* chore: remove old files

* feat: ari outbound dialing

* feat: add websocket configuration for ARI

* feat: handling inbound calls

* delete ext channel from redis on stasis end

* fix: add lock in workflow run update, refactor _handle_stasis_start

* chore: update submodule

---------

Co-authored-by: Sabiha Khan <sabihak89@gmail.com>
This commit is contained in:
Abhishek 2026-02-17 19:32:03 +05:30 committed by GitHub
parent ee4a874e54
commit 7552b6c819
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
37 changed files with 2076 additions and 4172 deletions

View file

@ -0,0 +1,71 @@
"""add ari mode
Revision ID: 6d2f94baf4b7
Revises: 1a7d74d54e8f
Create Date: 2026-02-15 13:52:29.285583
"""
from typing import Sequence, Union
from alembic import op
from alembic_postgresql_enum import TableReference
# revision identifiers, used by Alembic.
revision: str = "6d2f94baf4b7"
down_revision: Union[str, None] = "1a7d74d54e8f"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.sync_enum_values(
enum_schema="public",
enum_name="workflow_run_mode",
new_values=[
"ari",
"twilio",
"vonage",
"vobiz",
"cloudonix",
"webrtc",
"smallwebrtc",
"stasis",
"VOICE",
"CHAT",
],
affected_columns=[
TableReference(
table_schema="public", table_name="workflow_runs", column_name="mode"
)
],
enum_values_to_rename=[],
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.sync_enum_values(
enum_schema="public",
enum_name="workflow_run_mode",
new_values=[
"twilio",
"vonage",
"vobiz",
"cloudonix",
"stasis",
"webrtc",
"smallwebrtc",
"VOICE",
"CHAT",
],
affected_columns=[
TableReference(
table_schema="public", table_name="workflow_runs", column_name="mode"
)
],
enum_values_to_rename=[],
)
# ### end Alembic commands ###

View file

@ -2,7 +2,7 @@
import sentry_sdk
from api.constants import DEPLOYMENT_MODE, ENABLE_TELEMETRY, REDIS_URL, SENTRY_DSN
from api.constants import DEPLOYMENT_MODE, ENABLE_TELEMETRY, SENTRY_DSN
from api.logging_config import ENVIRONMENT, setup_logging
# Set up logging and get the listener for cleanup
@ -21,62 +21,27 @@ if SENTRY_DSN and (
from contextlib import asynccontextmanager
from typing import Optional
import redis.asyncio as aioredis
from fastapi import APIRouter, FastAPI
from fastapi.middleware.cors import CORSMiddleware
from loguru import logger
from api.routes.main import router as main_router
from api.services.telephony.worker_event_subscriber import (
WorkerEventSubscriber,
setup_worker_subscriber,
)
from api.tasks.arq import get_arq_redis
API_PREFIX = "/api/v1"
# Global reference to worker subscriber for graceful shutdown
worker_subscriber_instance: Optional[WorkerEventSubscriber] = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global worker_subscriber_instance
# warmup arq pool
await get_arq_redis()
# Setup Redis connection for distributed mode
redis = await aioredis.from_url(REDIS_URL, decode_responses=True)
# Setup worker subscriber (ARI Manager runs separately)
worker_subscriber = await setup_worker_subscriber(redis)
worker_subscriber_instance = worker_subscriber
# Store worker ID in app state for health check
app.state.worker_id = worker_subscriber.worker_id
app.state.worker_subscriber = worker_subscriber
yield # Run app
# Shutdown sequence - this runs when FastAPI is shutting down
logger.info("Starting graceful shutdown...")
# First, try graceful shutdown with timeout
if worker_subscriber:
try:
# Check if we should do graceful shutdown (e.g., if SIGTERM was received)
# For now, we'll attempt graceful shutdown for all shutdowns
await worker_subscriber.graceful_shutdown(max_wait_seconds=300)
except Exception as e:
logger.error(f"Error during graceful shutdown: {e}")
# Fall back to immediate stop
await worker_subscriber.stop()
await redis.aclose()
app = FastAPI(
title="Dograh API",

View file

@ -1,4 +1,4 @@
from typing import Any, Optional
from typing import Any, Dict, List, Optional
from sqlalchemy.future import select
@ -94,3 +94,27 @@ class OrganizationConfigurationClient(BaseDBClient):
"""Get the value of a configuration, returning default if not found."""
config = await self.get_configuration(organization_id, key)
return config.value if config else default
async def get_configurations_by_provider(
self, key: str, provider: str
) -> List[Dict[str, Any]]:
"""Get all organization configurations for a given key filtered by provider.
Returns a list of dicts with organization_id and the config value.
"""
async with self.async_session() as session:
result = await session.execute(
select(OrganizationConfigurationModel).where(
OrganizationConfigurationModel.key == key,
)
)
configs = result.scalars().all()
return [
{
"organization_id": config.organization_id,
"value": config.value,
}
for config in configs
if config.value and config.value.get("provider") == provider
]

View file

@ -321,8 +321,11 @@ class WorkflowRunClient(BaseDBClient):
state: str | None = None,
) -> WorkflowRunModel:
async with self.async_session() as session:
# Use SELECT FOR UPDATE to lock the row during the update
result = await session.execute(
select(WorkflowRunModel).where(WorkflowRunModel.id == run_id)
select(WorkflowRunModel)
.where(WorkflowRunModel.id == run_id)
.with_for_update()
)
run = result.scalars().first()
if not run:

View file

@ -18,16 +18,17 @@ class CallType(Enum):
class WorkflowRunMode(Enum):
ARI = "ari"
TWILIO = "twilio"
VONAGE = "vonage"
VOBIZ = "vobiz"
CLOUDONIX = "cloudonix"
STASIS = "stasis"
WEBRTC = "webrtc"
SMALLWEBRTC = "smallwebrtc"
# Historical, not used anymore. Don't
# use and don't remove
STASIS = "stasis"
VOICE = "VOICE"
CHAT = "CHAT"

View file

@ -8,6 +8,8 @@ from api.db import db_client
from api.db.models import UserModel
from api.enums import OrganizationConfigurationKey
from api.schemas.telephony_config import (
ARIConfigurationRequest,
ARIConfigurationResponse,
CloudonixConfigurationRequest,
CloudonixConfigurationResponse,
TelephonyConfigurationResponse,
@ -29,6 +31,7 @@ PROVIDER_MASKED_FIELDS = {
"vonage": ["private_key", "api_key", "api_secret"],
"vobiz": ["auth_id", "auth_token"],
"cloudonix": ["bearer_token"],
"ari": ["app_password"],
}
@ -125,6 +128,26 @@ async def get_telephony_configuration(user: UserModel = Depends(get_user)):
),
vobiz=None,
)
elif stored_provider == "ari":
ari_endpoint = config.value.get("ari_endpoint", "")
app_name = config.value.get("app_name", "")
app_password = config.value.get("app_password", "")
ws_client_name = config.value.get("ws_client_name", "")
from_numbers = config.value.get("from_numbers", [])
inbound_workflow_id = config.value.get("inbound_workflow_id")
return TelephonyConfigurationResponse(
ari=ARIConfigurationResponse(
provider="ari",
ari_endpoint=ari_endpoint,
app_name=app_name,
app_password=mask_key(app_password) if app_password else "",
ws_client_name=ws_client_name,
inbound_workflow_id=inbound_workflow_id,
from_numbers=from_numbers,
),
)
else:
return TelephonyConfigurationResponse()
@ -136,6 +159,7 @@ async def save_telephony_configuration(
VonageConfigurationRequest,
VobizConfigurationRequest,
CloudonixConfigurationRequest,
ARIConfigurationRequest,
],
user: UserModel = Depends(get_user),
):
@ -180,6 +204,16 @@ async def save_telephony_configuration(
"domain_id": request.domain_id,
"from_numbers": request.from_numbers,
}
elif request.provider == "ari":
config_value = {
"provider": "ari",
"ari_endpoint": request.ari_endpoint,
"app_name": request.app_name,
"app_password": request.app_password,
"ws_client_name": request.ws_client_name,
"inbound_workflow_id": request.inbound_workflow_id,
"from_numbers": request.from_numbers,
}
else:
raise HTTPException(
status_code=400, detail=f"Unsupported provider: {request.provider}"

View file

@ -1,45 +0,0 @@
import random
from loguru import logger
from api.db import db_client
from api.enums import WorkflowRunMode
from api.services.pipecat.run_pipeline import run_pipeline_ari_stasis
from api.services.telephony.stasis_rtp_connection import StasisRTPConnection
from pipecat.utils.run_context import set_current_run_id
async def on_stasis_call(call: StasisRTPConnection, call_context_vars: dict):
workflow_id = call_context_vars.get("workflow_id") or call_context_vars.get(
"campaign_id"
)
user_id = call_context_vars.get("user_id")
assert workflow_id is not None
assert user_id is not None
try:
workflow_id = int(workflow_id)
user_id = int(user_id)
except ValueError:
logger.error(f"Invalid workflow ID or user ID: {workflow_id} or {user_id}")
return
workflow_run_name = f"WR-ARI-{random.randint(1000, 9999)}"
workflow_run = await db_client.create_workflow_run(
workflow_run_name, workflow_id, WorkflowRunMode.STASIS.value, user_id
)
set_current_run_id(workflow_run.id)
# Store the workflow_run_id in the connection for later use
call.workflow_run_id = workflow_run.id
# Connect channelID with Workflow run ID in logs
logger.info(
f"channelID: {call.caller_channel_id} run_id: {workflow_run.id} "
f"Received call for workflow ID {workflow_id}, user ID {user_id}"
)
await run_pipeline_ari_stasis(
call, workflow_id, workflow_run.id, user_id, call_context_vars
)

View file

@ -523,13 +523,47 @@ async def handle_ncco_webhook(
return json.loads(response_content)
@router.websocket("/ws/ari")
async def websocket_ari_endpoint(websocket: WebSocket):
"""WebSocket endpoint for ARI chan_websocket external media.
Asterisk connects here via chan_websocket. Routing params are passed as
query params (appended by the v() dial string option in externalMedia).
"""
workflow_id = websocket.query_params.get("workflow_id")
user_id = websocket.query_params.get("user_id")
workflow_run_id = websocket.query_params.get("workflow_run_id")
if not workflow_id or not user_id or not workflow_run_id:
logger.error(
f"ARI WebSocket missing query params: "
f"workflow_id={workflow_id}, user_id={user_id}, workflow_run_id={workflow_run_id}"
)
await websocket.close(code=4400, reason="Missing required query params")
return
# Accept with "media" subprotocol — chan_websocket sends
# Sec-WebSocket-Protocol: media and requires it echoed back.
await websocket.accept(subprotocol="media")
await _handle_telephony_websocket(
websocket, int(workflow_id), int(user_id), int(workflow_run_id)
)
@router.websocket("/ws/{workflow_id}/{user_id}/{workflow_run_id}")
async def websocket_endpoint(
websocket: WebSocket, workflow_id: int, user_id: int, workflow_run_id: int
):
"""WebSocket endpoint for real-time call handling - routes to provider-specific handlers."""
await websocket.accept()
await _handle_telephony_websocket(websocket, workflow_id, user_id, workflow_run_id)
async def _handle_telephony_websocket(
websocket: WebSocket, workflow_id: int, user_id: int, workflow_run_id: int
):
"""Shared WebSocket handler logic (connection already accepted)."""
try:
# Set the run context
set_current_run_id(workflow_run_id)

View file

@ -89,6 +89,42 @@ class CloudonixConfigurationResponse(BaseModel):
from_numbers: List[str]
class ARIConfigurationRequest(BaseModel):
"""Request schema for Asterisk ARI configuration."""
provider: str = Field(default="ari")
ari_endpoint: str = Field(
..., description="ARI base URL (e.g., http://asterisk.example.com:8088)"
)
app_name: str = Field(
..., description="Stasis application name registered in Asterisk"
)
app_password: str = Field(..., description="ARI user password")
ws_client_name: str = Field(
default="",
description="websocket_client.conf connection name for externalMedia (e.g., dograh_staging)",
)
inbound_workflow_id: Optional[int] = Field(
default=None, description="Workflow ID for inbound calls"
)
from_numbers: List[str] = Field(
default_factory=list,
description="List of SIP extensions/numbers for outbound calls (optional)",
)
class ARIConfigurationResponse(BaseModel):
"""Response schema for ARI configuration with masked sensitive fields."""
provider: str
ari_endpoint: str
app_name: str
app_password: str # Masked
ws_client_name: str = ""
inbound_workflow_id: Optional[int] = None
from_numbers: List[str]
class TelephonyConfigurationResponse(BaseModel):
"""Top-level telephony configuration response."""
@ -96,3 +132,4 @@ class TelephonyConfigurationResponse(BaseModel):
vonage: Optional[VonageConfigurationResponse] = None
vobiz: Optional[VobizConfigurationResponse] = None
cloudonix: Optional[CloudonixConfigurationResponse] = None
ari: Optional[ARIConfigurationResponse] = None

View file

@ -87,18 +87,18 @@ def create_audio_config(transport_type: str) -> AudioConfig:
"""Create audio configuration based on transport type.
Args:
transport_type: Type of transport ("webrtc", "twilio", "vonage", "vobiz", "cloudonix", "stasis")
transport_type: Type of transport ("webrtc", "twilio", "vonage", "vobiz", "cloudonix")
Returns:
AudioConfig instance with appropriate settings
"""
if transport_type in (
WorkflowRunMode.STASIS.value,
WorkflowRunMode.TWILIO.value,
WorkflowRunMode.VOBIZ.value,
WorkflowRunMode.CLOUDONIX.value,
WorkflowRunMode.ARI.value,
):
# Twilio, Cloudonix, Vobiz, and Stasis use MULAW at 8kHz
# Twilio, Cloudonix, Vobiz, and ARI use MULAW at 8kHz
return AudioConfig(
transport_in_sample_rate=8000,
transport_out_sample_rate=8000,

View file

@ -32,15 +32,14 @@ from api.services.pipecat.service_factory import (
)
from api.services.pipecat.tracing_config import setup_pipeline_tracing
from api.services.pipecat.transport_setup import (
create_ari_transport,
create_cloudonix_transport,
create_stasis_transport,
create_twilio_transport,
create_vobiz_transport,
create_vonage_transport,
create_webrtc_transport,
)
from api.services.pipecat.ws_sender_registry import get_ws_sender
from api.services.telephony.stasis_rtp_connection import StasisRTPConnection
from api.services.workflow.dto import ReactFlowDTO
from api.services.workflow.pipecat_engine import PipecatEngine
from api.services.workflow.workflow import WorkflowGraph
@ -199,6 +198,63 @@ async def run_pipeline_vonage(
raise
async def run_pipeline_ari(
websocket_client: WebSocket,
channel_id: str,
workflow_id: int,
workflow_run_id: int,
user_id: int,
) -> None:
"""Run pipeline for Asterisk ARI WebSocket connections.
ARI uses raw 16-bit signed linear PCM (SLIN16) at 16kHz
transmitted as binary WebSocket frames via chan_websocket.
"""
logger.info(f"Starting ARI pipeline for workflow run {workflow_run_id}")
set_current_run_id(workflow_run_id)
# Store call ID (channel_id) in cost_info
cost_info = {"call_id": channel_id}
await db_client.update_workflow_run(workflow_run_id, cost_info=cost_info)
# Get workflow to extract configurations
workflow = await db_client.get_workflow(workflow_id, user_id)
vad_config = None
ambient_noise_config = None
if workflow and workflow.workflow_configurations:
if "vad_configuration" in workflow.workflow_configurations:
vad_config = workflow.workflow_configurations["vad_configuration"]
if "ambient_noise_configuration" in workflow.workflow_configurations:
ambient_noise_config = workflow.workflow_configurations[
"ambient_noise_configuration"
]
try:
audio_config = create_audio_config(WorkflowRunMode.ARI.value)
transport = await create_ari_transport(
websocket_client,
channel_id,
workflow_run_id,
audio_config,
workflow.organization_id,
vad_config,
ambient_noise_config,
)
await _run_pipeline(
transport,
workflow_id,
workflow_run_id,
user_id,
audio_config=audio_config,
)
except Exception as e:
logger.error(f"Error in ARI pipeline: {e}")
raise
async def run_pipeline_vobiz(
websocket_client: WebSocket,
stream_id: str,
@ -364,52 +420,6 @@ async def run_pipeline_smallwebrtc(
)
async def run_pipeline_ari_stasis(
stasis_connection: StasisRTPConnection,
workflow_id: int,
workflow_run_id: int,
user_id: int,
call_context_vars: dict,
) -> None:
"""Run pipeline for ARI connections"""
logger.debug(
f"Running pipeline for ARI connection with workflow_id: {workflow_id} and workflow_run_id: {workflow_run_id}"
)
set_current_run_id(workflow_run_id)
# Get workflow to extract all pipeline configurations
workflow = await db_client.get_workflow(workflow_id, user_id)
vad_config = None
ambient_noise_config = None
if workflow and workflow.workflow_configurations:
if "vad_configuration" in workflow.workflow_configurations:
vad_config = workflow.workflow_configurations["vad_configuration"]
if "ambient_noise_configuration" in workflow.workflow_configurations:
ambient_noise_config = workflow.workflow_configurations[
"ambient_noise_configuration"
]
# Create audio configuration for Stasis
audio_config = create_audio_config(WorkflowRunMode.STASIS.value)
transport = create_stasis_transport(
stasis_connection,
workflow_run_id,
audio_config,
vad_config,
ambient_noise_config,
)
await _run_pipeline(
transport,
workflow_id,
workflow_run_id,
user_id,
call_context_vars=call_context_vars,
audio_config=audio_config,
stasis_connection=stasis_connection, # Pass connection for immediate transfers
)
async def _run_pipeline(
transport,
workflow_id: int,
@ -417,7 +427,6 @@ async def _run_pipeline(
user_id: int,
call_context_vars: dict = {},
audio_config: AudioConfig = None,
stasis_connection: Optional[StasisRTPConnection] = None,
) -> None:
"""
Run the pipeline with the given transport and configuration
@ -559,10 +568,6 @@ async def _run_pipeline(
engine.set_context(context)
engine.set_audio_config(audio_config)
# Set Stasis connection for immediate transfers (if available)
if stasis_connection:
engine.set_stasis_connection(stasis_connection)
assistant_params = LLMAssistantAggregatorParams(
expect_stripped_words=True,
correct_aggregation_callback=engine.create_aggregation_correction_callback(),

View file

@ -156,7 +156,7 @@ def create_tts_service(user_config, audio_config: "AudioConfig"):
Args:
user_config: User configuration containing TTS settings
transport_type: Type of transport (e.g., 'stasis', 'twilio', 'webrtc')
transport_type: Type of transport (e.g., 'twilio', 'webrtc')
"""
logger.info(
f"Creating TTS service: provider={user_config.tts.provider}, model={user_config.tts.model}"

View file

@ -6,14 +6,9 @@ from api.constants import APP_ROOT_DIR
from api.db import db_client
from api.enums import OrganizationConfigurationKey
from api.services.pipecat.audio_config import AudioConfig
from api.services.telephony.stasis_rtp_connection import StasisRTPConnection
from api.services.telephony.stasis_rtp_serializer import StasisRTPFrameSerializer
from api.services.telephony.stasis_rtp_transport import (
StasisRTPTransport,
StasisRTPTransportParams,
)
from pipecat.audio.mixers.silence_mixer import SilenceAudioMixer
from pipecat.audio.mixers.soundfile_mixer import SoundfileMixer
from pipecat.serializers.asterisk import AsteriskFrameSerializer
from pipecat.serializers.twilio import TwilioFrameSerializer
from pipecat.serializers.vobiz import VobizFrameSerializer
from pipecat.serializers.vonage import VonageFrameSerializer
@ -156,6 +151,70 @@ async def create_cloudonix_transport(
)
async def create_ari_transport(
websocket_client: WebSocket,
channel_id: str,
workflow_run_id: int,
audio_config: AudioConfig,
organization_id: int,
vad_config: dict | None = None,
ambient_noise_config: dict | None = None,
):
"""Create a transport for Asterisk ARI connections"""
from api.services.telephony.factory import load_telephony_config
config = await load_telephony_config(organization_id)
if config.get("provider") != "ari":
raise ValueError(f"Expected ARI provider, got {config.get('provider')}")
ari_endpoint = config.get("ari_endpoint")
app_name = config.get("app_name")
app_password = config.get("app_password")
if not ari_endpoint or not app_name or not app_password:
raise ValueError(
f"Incomplete ARI configuration for organization {organization_id}. "
f"Required: ari_endpoint, app_name, app_password"
)
serializer = AsteriskFrameSerializer(
channel_id=channel_id,
ari_endpoint=ari_endpoint,
app_name=app_name,
app_password=app_password,
params=AsteriskFrameSerializer.InputParams(
asterisk_sample_rate=audio_config.transport_in_sample_rate,
sample_rate=audio_config.pipeline_sample_rate,
),
)
return FastAPIWebsocketTransport(
websocket=websocket_client,
params=FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
audio_in_sample_rate=audio_config.transport_in_sample_rate,
audio_out_sample_rate=audio_config.transport_out_sample_rate,
audio_out_mixer=(
SoundfileMixer(
sound_files={
"office": APP_ROOT_DIR
/ "assets"
/ f"office-ambience-{audio_config.transport_out_sample_rate}-mono.wav"
},
default_sound="office",
volume=ambient_noise_config.get("volume", 0.3),
)
if ambient_noise_config and ambient_noise_config.get("enabled", False)
else SilenceAudioMixer()
),
serializer=serializer,
),
)
async def create_vonage_transport(
websocket_client,
call_uuid: str,
@ -345,47 +404,6 @@ def create_webrtc_transport(
)
def create_stasis_transport(
stasis_connection: StasisRTPConnection,
workflow_run_id: int,
audio_config: AudioConfig,
vad_config: dict | None = None,
ambient_noise_config: dict | None = None,
):
"""Create a transport for ARI connections"""
serializer = StasisRTPFrameSerializer(
StasisRTPFrameSerializer.InputParams(
sample_rate=audio_config.transport_in_sample_rate
)
)
return StasisRTPTransport(
stasis_connection,
params=StasisRTPTransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
audio_out_sample_rate=audio_config.transport_out_sample_rate,
audio_in_sample_rate=audio_config.transport_in_sample_rate,
# audio_out_10ms_chunks=2, # ToDo: Check if we cant support 40 ms packets?
audio_out_mixer=(
SoundfileMixer(
sound_files={
"office": APP_ROOT_DIR
/ "assets"
/ f"office-ambience-{audio_config.transport_out_sample_rate}-mono.wav"
},
default_sound="office",
volume=ambient_noise_config.get("volume", 0.3),
)
if ambient_noise_config and ambient_noise_config.get("enabled", False)
else SilenceAudioMixer()
),
serializer=serializer,
),
)
def create_internal_transport(
workflow_run_id: int,
audio_config: AudioConfig,

View file

@ -1,765 +0,0 @@
"""
Dynamic ARI client that generates methods from Swagger/OpenAPI specification.
Pure asyncio implementation without anyio dependencies.
"""
import asyncio
import json
from dataclasses import dataclass, field
from typing import Callable, Dict, List, Optional
from urllib.parse import urljoin
import aiohttp
from loguru import logger
class SwaggerMethod:
"""Represents a Swagger API method."""
def __init__(
self, client: "AsyncARIClient", path: str, method: str, operation: dict
):
self.client = client
self.path = path
self.http_method = method.upper()
self.operation = operation
self.operation_id = operation.get("operationId", "")
self.parameters = operation.get("parameters", [])
self.description = operation.get("description", "")
def _build_path(self, **kwargs) -> str:
"""Build the actual path by substituting path parameters."""
path = self.path
# Replace path parameters like {channelId} with actual values
for param in self.parameters:
# Swagger spec uses 'paramType' not 'in'
if param.get("paramType", param.get("in")) == "path":
param_name = param["name"]
if param_name in kwargs:
path = path.replace(f"{{{param_name}}}", str(kwargs[param_name]))
return path
def _build_params(self, **kwargs) -> dict:
"""Extract query parameters from kwargs."""
params = {}
for param in self.parameters:
# Swagger spec uses 'paramType' not 'in'
if param.get("paramType", param.get("in")) == "query":
param_name = param["name"]
if param_name in kwargs:
params[param_name] = kwargs[param_name]
return params
def _build_body(self, **kwargs) -> dict:
"""Extract body parameters from kwargs."""
body = {}
for param in self.parameters:
# Swagger 1.2 uses 'paramType' = 'body' for body parameters
if param.get("paramType", param.get("in")) == "body":
param_name = param["name"]
if param_name in kwargs:
# In Swagger 1.2, body param is usually the whole body
return (
kwargs[param_name]
if isinstance(kwargs[param_name], dict)
else {param_name: kwargs[param_name]}
)
return body
async def __call__(self, **kwargs):
"""Execute the API method."""
path = self._build_path(**kwargs)
params = self._build_params(**kwargs)
# Check if there's a body parameter defined in the spec
body_data = self._build_body(**kwargs)
# If no body param in spec, use remaining kwargs for body (backward compat)
if not body_data:
# Remove path and query parameters from kwargs (leaving body params)
# Swagger spec uses 'paramType' not 'in'
path_param_names = {
p["name"]
for p in self.parameters
if p.get("paramType", p.get("in")) == "path"
}
query_param_names = {
p["name"]
for p in self.parameters
if p.get("paramType", p.get("in")) == "query"
}
body_param_names = {
p["name"]
for p in self.parameters
if p.get("paramType", p.get("in")) == "body"
}
body_data = {
k: v
for k, v in kwargs.items()
if k not in path_param_names
and k not in query_param_names
and k not in body_param_names
}
# Debug logging for externalMedia
if "externalMedia" in path:
logger.debug(
f"externalMedia call - method: {self.http_method}, path: {path}, params: {params}"
)
if self.http_method == "GET":
return await self.client.api_get(path, **params)
elif self.http_method == "POST":
return await self.client.api_post(
path, json_data=body_data if body_data else None, **params
)
elif self.http_method == "PUT":
return await self.client.api_put(
path, json_data=body_data if body_data else None, **params
)
elif self.http_method == "DELETE":
return await self.client.api_delete(path, **params)
else:
raise ValueError(f"Unsupported HTTP method: {self.http_method}")
class ResourceAPI:
"""Represents a resource API (like channels, bridges, etc.)."""
def __init__(self, client: "AsyncARIClient", resource_name: str):
self.client = client
self.resource_name = resource_name
self._methods = {}
def add_method(self, method_name: str, swagger_method: SwaggerMethod):
"""Add a method to this resource."""
self._methods[method_name] = swagger_method
def __getattr__(self, name):
"""Dynamically return methods."""
if name in self._methods:
return self._methods[name]
raise AttributeError(f"'{self.resource_name}' has no method '{name}'")
@dataclass
class Channel:
"""Channel model with dynamic method support."""
id: str
name: str = ""
state: str = ""
caller: Dict[str, str] = field(default_factory=dict)
connected: Dict[str, str] = field(default_factory=dict)
accountcode: str = ""
dialplan: Dict[str, str] = field(default_factory=dict)
creationtime: str = ""
language: str = "en"
# Store reference to client for method calls
_client: Optional["AsyncARIClient"] = field(default=None, repr=False)
@classmethod
def from_dict(cls, data: dict, client=None) -> "Channel":
"""Create Channel from API response."""
channel = cls(
id=data.get("id", ""),
name=data.get("name", ""),
state=data.get("state", ""),
caller=data.get("caller", {}),
connected=data.get("connected", {}),
accountcode=data.get("accountcode", ""),
dialplan=data.get("dialplan", {}),
creationtime=data.get("creationtime", ""),
language=data.get("language", "en"),
_client=client,
)
return channel
async def continueInDialplan(
self,
context: str = None,
extension: str = None,
priority: int = None,
label: str = None,
):
"""Continue channel in dialplan."""
if not self._client:
raise RuntimeError("Channel not associated with a client")
params = {"channelId": self.id}
if context:
params["context"] = context
if extension:
params["extension"] = extension
if priority is not None:
params["priority"] = priority
if label:
params["label"] = label
# The ARI API method is named 'continueInDialplan'
channels_api = self._client.channels
if hasattr(channels_api, "continueInDialplan"):
await channels_api.continueInDialplan(**params)
else:
# Fallback to direct API call
await self._client.api_post(f"/channels/{self.id}/continue", **params)
async def hangup(self, reason: str = "normal"):
"""Hangup the channel."""
if not self._client:
raise RuntimeError("Channel not associated with a client")
await self._client.channels.hangup(channelId=self.id, reason=reason)
async def answer(self):
"""Answer the channel."""
if not self._client:
raise RuntimeError("Channel not associated with a client")
await self._client.channels.answer(channelId=self.id)
async def getChannelVar(self, variable: str):
"""Get a channel variable."""
if not self._client:
raise RuntimeError("Channel not associated with a client")
return await self._client.channels.getChannelVar(
channelId=self.id, variable=variable
)
@dataclass
class Bridge:
"""Bridge model with dynamic method support."""
id: str
technology: str = ""
bridge_type: str = ""
bridge_class: str = ""
creator: str = ""
name: str = ""
channels: List[str] = field(default_factory=list)
_client: Optional["AsyncARIClient"] = field(default=None, repr=False)
@classmethod
def from_dict(cls, data: dict, client=None) -> "Bridge":
"""Create Bridge from API response."""
return cls(
id=data.get("id", ""),
technology=data.get("technology", ""),
bridge_type=data.get("bridge_type", ""),
bridge_class=data.get("bridge_class", ""),
creator=data.get("creator", ""),
name=data.get("name", ""),
channels=data.get("channels", []),
_client=client,
)
async def addChannel(self, channel: str):
"""Add channel to bridge."""
if not self._client:
raise RuntimeError("Bridge not associated with a client")
await self._client.bridges.addChannel(bridgeId=self.id, channel=channel)
async def removeChannel(self, channel: str):
"""Remove channel from bridge."""
if not self._client:
raise RuntimeError("Bridge not associated with a client")
await self._client.bridges.removeChannel(bridgeId=self.id, channel=channel)
async def destroy(self):
"""Destroy the bridge."""
if not self._client:
raise RuntimeError("Bridge not associated with a client")
await self._client.bridges.destroy(bridgeId=self.id)
class AsyncARIClient:
"""ARI client that dynamically generates methods from Swagger spec."""
def __init__(self, base_url: str, username: str, password: str, app: str):
self.base_url = base_url.rstrip("/")
self.username = username
self.password = password
self.app = app
# REST API URL
self.api_url = self.base_url.replace("ws://", "http://").replace(
"wss://", "https://"
)
# WebSocket URL
self.ws_url = (
f"{self.base_url}/ari/events?app={app}&api_key={username}:{password}"
)
# Session and WebSocket
self._session: Optional[aiohttp.ClientSession] = None
self._websocket: Optional[aiohttp.ClientWebSocketResponse] = None
self._running = False
# Event handling
self._event_handlers: Dict[str, List[Callable]] = {}
self._event_queue: asyncio.Queue = asyncio.Queue(maxsize=1000)
# Resource APIs (will be populated from Swagger)
self.channels: Optional[ResourceAPI] = None
self.bridges: Optional[ResourceAPI] = None
self.endpoints: Optional[ResourceAPI] = None
self.recordings: Optional[ResourceAPI] = None
self.sounds: Optional[ResourceAPI] = None
self.playbacks: Optional[ResourceAPI] = None
self.asterisk: Optional[ResourceAPI] = None
self.applications: Optional[ResourceAPI] = None
self.deviceStates: Optional[ResourceAPI] = None
self.mailboxes: Optional[ResourceAPI] = None
# Swagger spec cache
self._swagger_spec: Optional[dict] = None
async def connect(self):
"""Connect to ARI and load Swagger spec."""
# Create HTTP session
auth = aiohttp.BasicAuth(self.username, self.password)
self._session = aiohttp.ClientSession(auth=auth)
try:
# Load Swagger spec and generate methods
await self._load_swagger_spec()
# Connect WebSocket
self._websocket = await self._session.ws_connect(
self.ws_url, heartbeat=30, autoping=True
)
self._running = True
logger.info(f"Connected to ARI at {self.ws_url}")
except Exception as e:
await self._session.close()
raise Exception(f"Failed to connect to ARI: {e}")
async def _load_swagger_spec(self):
"""Load Swagger spec and generate dynamic methods."""
spec_loaded = False
try:
# Get Swagger spec from ARI
url = f"{self.api_url}/ari/api-docs/resources.json"
async with self._session.get(url) as resp:
resp.raise_for_status()
resources = await resp.json()
# Store the spec
self._swagger_spec = resources
# Create resource APIs
for api_info in resources.get("apis", []):
resource_path = api_info["path"]
# Fix the path - remove .{format} and add proper prefix
resource_path = resource_path.replace(".{format}", ".json")
# Load detailed spec for this resource
# The resource_path already contains /api-docs/, so we just need the base URL
url = f"{self.api_url}/ari{resource_path}"
try:
async with self._session.get(url) as resp:
resp.raise_for_status()
spec = await resp.json()
self._process_swagger_spec(spec)
spec_loaded = True
except Exception as e:
logger.warning(f"Failed to load spec for {resource_path}: {e}")
if spec_loaded:
logger.info("Loaded Swagger spec and generated dynamic methods")
else:
raise Exception("No individual specs could be loaded")
except Exception as e:
logger.warning(f"Failed to load Swagger spec, using fallback methods: {e}")
self._create_fallback_methods()
def _process_swagger_spec(self, spec: dict):
"""Process a Swagger spec and create dynamic methods."""
# basePath is available in spec but not currently used
for api in spec.get("apis", []):
path = api["path"]
for operation in api.get("operations", []):
self._create_method_from_operation(path, operation)
def _create_method_from_operation(self, path: str, operation: dict):
"""Create a method from a Swagger operation."""
# Swagger spec uses 'httpMethod' not 'method'
method = operation.get("httpMethod", operation.get("method", "GET"))
operation_id = operation.get("nickname", "")
if not operation_id:
return
# Determine resource from path (e.g., /channels/{channelId} -> channels)
path_parts = path.strip("/").split("/")
if path_parts:
resource_name = path_parts[0]
# Create resource API if it doesn't exist
if not hasattr(self, resource_name) or getattr(self, resource_name) is None:
setattr(self, resource_name, ResourceAPI(self, resource_name))
resource_api = getattr(self, resource_name)
# Extract method name from operation ID
# e.g., "channels_continue" -> "continue_"
# or "channels_get" -> "get"
method_name = operation_id
if method_name.startswith(resource_name + "_"):
method_name = method_name[len(resource_name) + 1 :]
# Handle special cases
if method_name == "continue":
method_name = "continue_" # Avoid Python keyword
# Create and add the method
swagger_method = SwaggerMethod(self, path, method, operation)
resource_api.add_method(method_name, swagger_method)
def _create_fallback_methods(self):
"""Create fallback methods if Swagger spec is not available."""
# Create basic resource APIs
self.channels = ResourceAPI(self, "channels")
self.bridges = ResourceAPI(self, "bridges")
# Add essential channel methods
self.channels.add_method(
"get",
SwaggerMethod(
self,
"/channels/{channelId}",
"GET",
{
"operationId": "get",
"parameters": [{"name": "channelId", "in": "path"}],
},
),
)
self.channels.add_method(
"hangup",
SwaggerMethod(
self,
"/channels/{channelId}",
"DELETE",
{
"operationId": "hangup",
"parameters": [
{"name": "channelId", "in": "path"},
{"name": "reason", "in": "query"},
],
},
),
)
self.channels.add_method(
"answer",
SwaggerMethod(
self,
"/channels/{channelId}/answer",
"POST",
{
"operationId": "answer",
"parameters": [{"name": "channelId", "in": "path"}],
},
),
)
self.channels.add_method(
"continueInDialplan",
SwaggerMethod(
self,
"/channels/{channelId}/continue",
"POST",
{
"operationId": "continueInDialplan",
"parameters": [
{"name": "channelId", "in": "path"},
{"name": "context", "in": "query"},
{"name": "extension", "in": "query"},
{"name": "priority", "in": "query"},
{"name": "label", "in": "query"},
],
},
),
)
self.channels.add_method(
"externalMedia",
SwaggerMethod(
self,
"/channels/externalMedia",
"POST",
{
"operationId": "externalMedia",
"parameters": [
{"name": "channelId", "in": "query"}, # Add channelId parameter
{"name": "app", "in": "query"},
{"name": "external_host", "in": "query"},
{"name": "format", "in": "query"},
{"name": "encapsulation", "in": "query"},
{"name": "transport", "in": "query"},
{"name": "connection_type", "in": "query"},
{"name": "direction", "in": "query"},
],
},
),
)
self.channels.add_method(
"getChannelVar",
SwaggerMethod(
self,
"/channels/{channelId}/variable",
"GET",
{
"operationId": "getChannelVar",
"parameters": [
{"name": "channelId", "in": "path"},
{"name": "variable", "in": "query"},
],
},
),
)
# Add essential bridge methods
self.bridges.add_method(
"get",
SwaggerMethod(
self,
"/bridges/{bridgeId}",
"GET",
{
"operationId": "get",
"parameters": [{"name": "bridgeId", "in": "path"}],
},
),
)
self.bridges.add_method(
"create",
SwaggerMethod(
self,
"/bridges",
"POST",
{
"operationId": "create",
"parameters": [
{"name": "type", "in": "query"},
{"name": "name", "in": "query"},
],
},
),
)
self.bridges.add_method(
"addChannel",
SwaggerMethod(
self,
"/bridges/{bridgeId}/addChannel",
"POST",
{
"operationId": "addChannel",
"parameters": [
{"name": "bridgeId", "in": "path"},
{"name": "channel", "in": "query"},
],
},
),
)
self.bridges.add_method(
"removeChannel",
SwaggerMethod(
self,
"/bridges/{bridgeId}/removeChannel",
"POST",
{
"operationId": "removeChannel",
"parameters": [
{"name": "bridgeId", "in": "path"},
{"name": "channel", "in": "query"},
],
},
),
)
self.bridges.add_method(
"destroy",
SwaggerMethod(
self,
"/bridges/{bridgeId}",
"DELETE",
{
"operationId": "destroy",
"parameters": [{"name": "bridgeId", "in": "path"}],
},
),
)
async def disconnect(self):
"""Disconnect from ARI."""
self._running = False
if self._websocket:
await self._websocket.close()
if self._session:
await self._session.close()
async def run(self):
"""Main event loop."""
if not self._websocket:
raise RuntimeError("Not connected")
processor_task = asyncio.create_task(self._process_events())
try:
async for msg in self._websocket:
if msg.type == aiohttp.WSMsgType.TEXT:
try:
event = json.loads(msg.data)
# Wrap channel/bridge objects
if "channel" in event and isinstance(event["channel"], dict):
event["channel"] = Channel.from_dict(event["channel"], self)
if "bridge" in event and isinstance(event["bridge"], dict):
event["bridge"] = Bridge.from_dict(event["bridge"], self)
await self._event_queue.put(event)
except json.JSONDecodeError:
logger.error(f"Invalid JSON: {msg.data}")
elif msg.type == aiohttp.WSMsgType.ERROR:
logger.error(f"WebSocket error: {self._websocket.exception()}")
break
elif msg.type == aiohttp.WSMsgType.CLOSED:
logger.info("WebSocket closed")
break
finally:
self._running = False
processor_task.cancel()
await asyncio.gather(processor_task, return_exceptions=True)
async def _process_events(self):
"""Process events from queue."""
while self._running:
try:
event = await asyncio.wait_for(self._event_queue.get(), timeout=1.0)
event_type = event.get("type")
if event_type:
await self._dispatch_event(event_type, event)
except asyncio.TimeoutError:
continue
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error processing event: {e}")
async def _dispatch_event(self, event_type: str, event: dict):
"""Dispatch event to handlers."""
handlers = self._event_handlers.get(event_type, [])
if handlers:
logger.debug(
f"AsyncARIClient: Dispatching {event_type} to {len(handlers)} handlers"
)
for i, handler in enumerate(handlers):
try:
logger.debug(
f" AsyncARIClient: Calling {event_type} handler {i + 1}/{len(handlers)}"
)
await handler(event)
except Exception as e:
logger.error(f"Handler {i + 1} error for {event_type}: {e}")
def on_event(self, event_type: str, handler: Callable):
"""Register event handler."""
if event_type not in self._event_handlers:
self._event_handlers[event_type] = []
logger.debug(
f"AsyncARIClient: Registering handler for {event_type}. Current count: {len(self._event_handlers.get(event_type, []))}"
)
self._event_handlers[event_type].append(handler)
logger.debug(
f"AsyncARIClient: After registration, {event_type} handler count: {len(self._event_handlers[event_type])}"
)
# REST API methods
async def api_get(self, path: str, **params) -> dict:
"""GET request."""
# Ensure path starts with /ari if not already
if not path.startswith("/ari"):
path = f"/ari{path}" if path.startswith("/") else f"/ari/{path}"
url = urljoin(self.api_url, path.lstrip("/"))
async with self._session.get(url, params=params) as resp:
resp.raise_for_status()
data = await resp.json()
# Wrap known objects
if isinstance(data, list):
# Handle lists of channels/bridges
if "/channels" in path:
return [
Channel.from_dict(item, self)
if isinstance(item, dict)
else item
for item in data
]
elif "/bridges" in path:
return [
Bridge.from_dict(item, self) if isinstance(item, dict) else item
for item in data
]
return data
elif isinstance(data, dict):
if "/channels/" in path and "id" in data:
return Channel.from_dict(data, self)
elif "/bridges/" in path and "id" in data:
return Bridge.from_dict(data, self)
return data
async def api_post(self, path: str, json_data: dict = None, **params) -> dict:
"""POST request."""
# Ensure path starts with /ari if not already
if not path.startswith("/ari"):
path = f"/ari{path}" if path.startswith("/") else f"/ari/{path}"
url = urljoin(self.api_url, path.lstrip("/"))
async with self._session.post(url, json=json_data, params=params) as resp:
resp.raise_for_status()
if resp.content_length and resp.content_length > 0:
data = await resp.json()
# Wrap known objects
if "id" in data and "state" in data:
return Channel.from_dict(data, self)
elif "id" in data and "bridge_type" in data:
return Bridge.from_dict(data, self)
return data
return {}
async def api_put(self, path: str, json_data: dict = None, **params) -> dict:
"""PUT request."""
# Ensure path starts with /ari if not already
if not path.startswith("/ari"):
path = f"/ari{path}" if path.startswith("/") else f"/ari/{path}"
url = urljoin(self.api_url, path.lstrip("/"))
async with self._session.put(url, json=json_data, params=params) as resp:
resp.raise_for_status()
if resp.content_length and resp.content_length > 0:
return await resp.json()
return {}
async def api_delete(self, path: str, **params) -> dict:
"""DELETE request."""
# Ensure path starts with /ari if not already
if not path.startswith("/ari"):
path = f"/ari{path}" if path.startswith("/") else f"/ari/{path}"
url = urljoin(self.api_url, path.lstrip("/"))
async with self._session.delete(url, params=params) as resp:
resp.raise_for_status()
if resp.content_length and resp.content_length > 0:
return await resp.json()
return {}

View file

@ -1,437 +0,0 @@
"""
ARI Client Manager using the new Async ARI Client.
Drop-in replacement for the existing ari_client_manager.py.
"""
import asyncio
import json
import os
import random
import time
from typing import Awaitable, Callable, Optional
import httpx
from loguru import logger
from api.services.telephony.ari_client import AsyncARIClient, Channel
from api.services.telephony.ari_client_singleton import ari_client_singleton
class ARIClientManager:
"""Manages ARI client connection and event handling.
This is a compatibility wrapper around AsyncARIClient.
"""
def __init__(
self,
ari_client: AsyncARIClient,
app_endpoint: str,
_conn_ctx=None, # Not used with AsyncARIClient
):
"""Initialize the ARI client manager.
Parameters
----------
ari_client: AsyncARIClient
The connected ARI client.
app_endpoint: str
The app endpoint for external media.
_conn_ctx:
Not used, kept for compatibility.
"""
self._ari_client = ari_client
self._app_endpoint = app_endpoint
self._conn_ctx = _conn_ctx # Not used but kept for compatibility
self._start_handlers = []
self._end_handlers = []
self._running = False
self._handlers_registered = False # Track if handlers are registered
def register_start_handler(
self, handler: Callable[[Channel, dict], Awaitable[None]]
):
"""Register a handler for StasisStart events."""
logger.debug(
f"Registering start handler. Current count: {len(self._start_handlers)}"
)
self._start_handlers.append(handler)
logger.debug(f"After registration, handler count: {len(self._start_handlers)}")
def register_end_handler(self, handler: Callable[[str], Awaitable[None]]):
"""Register a handler for StasisEnd events."""
self._end_handlers.append(handler)
async def update_client(self, new_client: AsyncARIClient, new_conn_ctx=None):
"""Update to a new client (for reconnection)."""
logger.info("Updating ARI client for reconnection")
self._ari_client = new_client
self._conn_ctx = new_conn_ctx
# Clear old event handlers from the client before re-registering
# to prevent duplicate handler registrations
if hasattr(new_client, "_event_handlers"):
new_client._event_handlers.clear()
# Re-register event handlers
self._register_handlers()
def _register_handlers(self):
"""Register event handlers with the client."""
logger.debug(
f"_register_handlers called. Start handlers count: {len(self._start_handlers)}, End handlers count: {len(self._end_handlers)}"
)
async def on_stasis_start(event):
"""Handle StasisStart events."""
channel = event.get("channel")
# Only handle PJSIP and SIP channels
if channel and hasattr(channel, "name"):
if not (
channel.name.startswith("PJSIP") or channel.name.startswith("SIP")
):
logger.debug(
f"Ignoring StasisStart for non-SIP channel: {channel.name}"
)
return
# Log the event
logger.info(
f"StasisStart event for channel: {channel.id if channel else 'unknown'}"
)
# Extract call context variables
call_context_vars = {}
try:
# Get channel variables
var_result = await channel.getChannelVar(
variable="LOCAL_ARI_CALL_VARIABLES"
)
call_context_vars = json.loads(var_result.get("value", "{}"))
# Try to get phone number and fetch additional data
phone_number = call_context_vars.get("phone")
ari_data_uri = os.getenv("ARI_DATA_FETCHING_URI")
if phone_number and ari_data_uri:
try:
start_time = time.time()
fetch_url = f"{ari_data_uri}{phone_number}"
async with httpx.AsyncClient() as client:
response = await client.get(fetch_url, timeout=10.0)
response.raise_for_status()
# Parse the response - get the latest line if multiple lines
response_text = response.text.strip()
if response_text:
lines = response_text.split("\n")
latest_line = lines[-1].strip()
if latest_line:
# Parse the pipe-delimited data
fields = latest_line.split("|")
field_names = [
"status",
"user",
"vendor_lead_code",
"source_id",
"list_id",
"gmt_offset_now",
"phone_code",
"phone_number",
"title",
"first_name",
"middle_initial",
"last_name",
"address1",
"address2",
"address3",
"city",
"state",
"province",
"postal_code",
"country_code",
"gender",
"date_of_birth",
"alt_phone",
"email",
"security_phrase",
"comments",
"called_count",
"last_local_call_time",
"rank",
"owner",
"entry_list_id",
"lead_id",
]
# Map fields to call_context_vars
for i, field_name in enumerate(field_names):
try:
call_context_vars[field_name] = fields[i]
except IndexError:
logger.error(
f"channelID: {channel.id} IndexError while accessing fields {i}"
)
elapsed_time = time.time() - start_time
logger.info(
f"channelID: {channel.id} Successfully fetched user details for phone: {phone_number} in {elapsed_time:.3f} seconds"
)
except Exception as e:
elapsed_time = time.time() - start_time
logger.error(
f"channelID: {channel.id} Failed to fetch user details from ARI_DATA_FETCHING_URI after {elapsed_time:.3f} seconds: {e}"
)
logger.debug(
f"channelID: {channel.id} call context variables: {call_context_vars}"
)
except (
KeyError,
AttributeError,
httpx.HTTPStatusError,
json.JSONDecodeError,
) as e:
logger.debug(f"could not find variable LOCAL_ARI_CALL_VARIABLES: {e}")
# Call all registered handlers with call_context_vars
logger.debug(
f"Calling {len(self._start_handlers)} start handlers for channel {channel.id}"
)
for i, handler in enumerate(self._start_handlers):
try:
logger.debug(
f" Calling start handler {i + 1}/{len(self._start_handlers)}"
)
await handler(channel, call_context_vars)
except Exception as e:
logger.error(f"Error in StasisStart handler {i + 1}: {e}")
async def on_stasis_end(event):
"""Handle StasisEnd events."""
channel = event.get("channel", {})
channel_id = channel.id if hasattr(channel, "id") else channel.get("id", "")
# # Only handle PJSIP and SIP channels
# if channel:
# channel_name = channel.name if hasattr(channel, 'name') else channel.get("name", "")
# if channel_name and not (channel_name.startswith("PJSIP") or channel_name.startswith("SIP")):
# logger.debug(f"Ignoring StasisEnd for non-SIP channel: {channel_name}")
# return
logger.info(f"StasisEnd event for channel: {channel_id}")
# Call all registered handlers
for handler in self._end_handlers:
try:
await handler(channel_id)
except Exception as e:
logger.error(f"Error in StasisEnd handler: {e}")
# Register with the AsyncARIClient
logger.debug(f"Registering StasisStart and StasisEnd with AsyncARIClient")
self._ari_client.on_event("StasisStart", on_stasis_start)
self._ari_client.on_event("StasisEnd", on_stasis_end)
logger.debug(f"Event handlers registered with client")
async def run(self):
"""Run the event loop.
The actual WebSocket handling is done by AsyncARIClient.
This just registers handlers and waits.
"""
logger.debug("Running ARIClientManager")
self._running = True
# Register handlers only once, on first run
if not self._handlers_registered:
self._register_handlers()
self._handlers_registered = True
try:
# The AsyncARIClient.run() method handles WebSocket
# We don't call it here as it's called by the supervisor
while self._running:
await asyncio.sleep(1)
except asyncio.CancelledError:
logger.debug(f"ARIClientManager run cancelled")
self._running = False
raise
finally:
self._running = False
class _ARIClientManagerSupervisor:
"""Supervisor that maintains ARI connection with automatic reconnection.
This replaces the asyncari-based supervisor with AsyncARIClient.
"""
# Reconnection parameters
_INITIAL_BACKOFF = 1 # Start with 1 second
_MAX_BACKOFF = 60 # Max 60 seconds between retries
def __init__(
self,
on_channel_start: Callable[[Channel, dict], Awaitable[None]],
on_channel_end: Optional[Callable[[str], Awaitable[None]]] = None,
):
self._on_channel_start = on_channel_start
self._on_channel_end = on_channel_end
self._shutting_down = False
async def start(self):
"""Start the supervisor and maintain connection."""
await self._runner()
async def stop(self):
"""Stop the supervisor."""
logger.info("Stopping ARI Client Manager Supervisor")
self._shutting_down = True
async def __aenter__(self):
"""Async context manager entry."""
asyncio.create_task(self.start())
return self
async def __aexit__(self, *args):
"""Async context manager exit."""
await self.stop()
async def _runner(self):
"""Main reconnection loop using AsyncARIClient."""
backoff = self._INITIAL_BACKOFF
ari_client_manager: Optional[ARIClientManager] = None
while not self._shutting_down:
client = None
try:
logger.debug("Going to connect with ARI")
# Get configuration from environment
base_url = os.getenv("ARI_STASIS_ENDPOINT")
username = os.getenv("ARI_STASIS_USER")
password = os.getenv("ARI_STASIS_USER_PASSWORD")
app = os.getenv("ARI_STASIS_APP_NAME")
# Convert HTTP to WebSocket URL
ws_url = base_url.replace("http://", "ws://").replace(
"https://", "wss://"
)
# Create and connect the AsyncARIClient
client = AsyncARIClient(ws_url, username, password, app)
await client.connect()
# Update the singleton with the new client
ari_client_singleton.set_client(client)
if ari_client_manager is None:
# First connection - create new manager
logger.debug("Creating new ARIClientManager (first connection)")
ari_client_manager = ARIClientManager(
client,
os.getenv("ARI_STASIS_APP_ENDPOINT"),
_conn_ctx=None, # Not needed with AsyncARIClient
)
logger.debug(f"Registering handlers with new manager")
ari_client_manager.register_start_handler(self._on_channel_start)
if self._on_channel_end:
ari_client_manager.register_end_handler(self._on_channel_end)
else:
# Reconnection - update existing manager
logger.debug("Updating existing ARIClientManager (reconnection)")
# Don't re-register start and end handlers as they're already registered
await ari_client_manager.update_client(client, None)
logger.info("Connected to ARI — supervisor entering event loop")
# Reset backoff after successful connection
backoff = self._INITIAL_BACKOFF
# Create tasks for both the client and manager
client_task = asyncio.create_task(client.run())
manager_task = asyncio.create_task(ari_client_manager.run())
# Wait for either to complete (likely due to disconnection)
done, pending = await asyncio.wait(
{client_task, manager_task}, return_when=asyncio.FIRST_COMPLETED
)
# Cancel the other task
for task in pending:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
except asyncio.CancelledError:
# Check if we're shutting down
if self._shutting_down or asyncio.current_task().cancelled():
logger.debug("ARI supervisor task cancelled — shutting down")
break
# Otherwise it's a transient connection error
logger.warning("ARI connection lost due to CancelledError — will retry")
# Force a context switch to reset event loop state
await asyncio.sleep(0)
except Exception as exc:
# Check if we're shutting down
if self._shutting_down or asyncio.current_task().cancelled():
logger.warning("Exiting due to shutdown during exception handling")
break
# Log and retry
logger.warning(f"ARI connection failed or lost: {exc!r} - will retry")
finally:
# Disconnect client if connected
if client:
try:
await client.disconnect()
except Exception as e:
logger.warning(f"Error disconnecting client: {e}")
# Clear the singleton when disconnecting
ari_client_singleton.clear()
# Check if we're shutting down before sleeping
if self._shutting_down:
logger.debug("Exiting reconnection loop due to shutdown")
break
# Exponential back-off with jitter before the next attempt
jitter = random.uniform(0.1, backoff)
logger.debug(f"Waiting {jitter:.1f} seconds before reconnecting...")
# Sleep with proper event loop handling
await asyncio.sleep(0) # Yield control first
await asyncio.sleep(jitter)
logger.debug(f"Finished sleeping for {jitter} seconds")
backoff = min(backoff * 2, self._MAX_BACKOFF)
logger.debug(f"New backoff value: {backoff}, continuing loop...")
async def setup_ari_client_supervisor(
on_channel_start: Callable[[Channel, dict], Awaitable[None]],
on_channel_end: Callable[[str], Awaitable[None]] | None = None,
) -> "_ARIClientManagerSupervisor | None":
"""Start a background supervisor that keeps the ARI connection alive.
This is a drop-in replacement for the asyncari-based function.
Uses AsyncARIClient instead of asyncari.
"""
logger.info("Starting ARI Client Supervisor with AsyncARIClient")
supervisor = _ARIClientManagerSupervisor(on_channel_start, on_channel_end)
# Start the supervisor in the background
asyncio.create_task(supervisor.start())
return supervisor

View file

@ -1,50 +0,0 @@
"""Singleton holder for the current ARI client instance.
This module provides a thread-safe singleton that holds the current
ARI client instance, which can be updated during reconnections.
"""
from typing import Optional
from loguru import logger
from api.services.telephony.ari_client import AsyncARIClient
class ARIClientSingleton:
"""Singleton holder for the current ARI client instance."""
_instance: Optional["ARIClientSingleton"] = None
_client: Optional[AsyncARIClient] = None
def __new__(cls):
"""Ensure only one instance exists."""
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def set_client(self, client: AsyncARIClient) -> None:
"""Update the ARI client instance.
Args:
client: The new ARI client instance.
"""
self._client = client
logger.info("ARI client singleton updated with new client instance")
def get_client(self) -> Optional[AsyncARIClient]:
"""Get the current ARI client instance.
Returns:
The current ARI client, or None if not set.
"""
return self._client
def clear(self) -> None:
"""Clear the current client instance."""
self._client = None
logger.info("ARI client singleton cleared")
# Global singleton instance
ari_client_singleton = ARIClientSingleton()

File diff suppressed because it is too large Load diff

View file

@ -1,323 +0,0 @@
"""ARI-specific Stasis connection for use by ARI Manager.
This connection has direct access to the ARI client and manages
the actual Asterisk channels, bridges, and RTP setup.
"""
import json
import os
import uuid
from typing import Optional
import httpx
from loguru import logger
from api.services.telephony.ari_client import AsyncARIClient, Bridge, Channel
from api.services.telephony.ari_client_singleton import ari_client_singleton
from pipecat.utils.base_object import BaseObject
class ARIManagerConnection(BaseObject):
"""ARI Manager's connection that directly controls Asterisk resources.
This class is used only by the ARI Manager process and has full
access to the ARI client for creating bridges, channels, etc.
"""
def __init__(
self,
caller_channel: Channel,
host: str,
port: int,
) -> None:
"""Initialize ARI Stasis connection.
Args:
caller_channel: The caller's channel object.
host: Host address for RTP transport.
port: Port number for RTP transport.
"""
super().__init__()
# External dependencies.
self._host: str = host
self._port: int = port
# Store channel IDs instead of Channel objects to avoid stale references
self.caller_channel_id: str = caller_channel.id
self.em_channel_id: Optional[str] = None # externalMedia channel ID
# Store bridge ID to avoid stale references after reconnection
self.bridge_id: Optional[str] = None
# RTP addressing information
self.local_addr = ("0.0.0.0", port)
self.remote_addr = None
# Internal state.
self._closed: bool = False
self._is_connected: bool = False
def is_connected(self) -> bool:
"""Check if the connection is established."""
return self._is_connected and not self._closed
@property
def _ari(self) -> Optional[AsyncARIClient]:
"""Get the current ARI client from singleton."""
return ari_client_singleton.get_client()
async def _get_channel(self, channel_id: str) -> Optional[Channel]:
"""Safely get a channel object by ID.
Returns None if the channel doesn't exist or can't be fetched.
"""
if not channel_id:
return None
try:
# Get current client from singleton
client = self._ari
if not client:
logger.warning(
f"Cannot get channel {channel_id} - No ARI client available"
)
return None
# Check if the session is still active
if not client._session or client._session.closed:
logger.warning(
f"Cannot get channel {channel_id} - ARI session is closed"
)
return None
return await client.channels.get(channelId=channel_id)
except Exception as e:
logger.warning(f"Could not get channel {channel_id} - {e}")
return None
async def _get_bridge(self, bridge_id: str) -> Optional[Bridge]:
"""Safely get a bridge object by ID.
Returns None if the bridge doesn't exist or can't be fetched.
"""
if not bridge_id:
return None
try:
# Get current client from singleton
client = self._ari
if not client:
logger.warning(
f"Cannot get bridge {bridge_id} - No ARI client available"
)
return None
# Check if the session is still active
if not client._session or client._session.closed:
logger.warning(f"Cannot get bridge {bridge_id} - ARI session is closed")
return None
return await client.bridges.get(bridgeId=bridge_id)
except Exception as e:
logger.warning(f"Could not get bridge {bridge_id}: {e}")
return None
async def _cleanup_resources(self):
"""Clean up external media channel and bridge."""
# Cleanup external media channel
try:
if self.em_channel_id:
em_channel = await self._get_channel(self.em_channel_id)
if em_channel:
await em_channel.hangup()
logger.debug(
f"channelID: {self.em_channel_id} Hung up external media"
)
self.em_channel_id = None
except Exception as exc:
logger.warning(
f"Failed to hang-up externalMedia channel: {self.em_channel_id}"
f"Error: {exc}"
)
# Cleanup bridge
try:
if self.bridge_id:
bridge = await self._get_bridge(self.bridge_id)
if bridge:
await bridge.destroy()
logger.debug(f"bridgeID: {self.bridge_id} Destroyed bridge")
self.bridge_id = None
except Exception as exc:
logger.warning(f"Failed to destroy bridge: {self.bridge_id}Error: {exc}")
async def _sync_call_data(self, call_transfer_context: dict):
"""Sync call data to ARI_DATA_SYNCING_URI."""
if not os.getenv("ARI_DATA_SYNCING_URI"):
return
lead_id = call_transfer_context.get("lead_id")
status = call_transfer_context.get("disposition")
# {'lead_id': '299154', 'disposition': 'VM', 'agent_name': 'Alex', 'decision_maker': 'False', 'employment': 'N/A', 'debts': 'N/A', 'number_of_credit_cards': 'N/A', 'time': '2025-08-07T13:16:02-04:00'}
full_name = call_transfer_context.get("full_name", "")
phone = call_transfer_context.get("phone", "")
debts = call_transfer_context.get("debts", "")
employment = call_transfer_context.get("employment", "")
time = call_transfer_context.get("time", "")
comment = f"Type:Qualified!NName:{full_name}!NPhone:{phone}!NDebts:{debts}!NCC:N/A!NDM:Yes!NEmployment:{employment}!NTime:{time}!NVendor Id:!NStatus:{status}"
try:
if lead_id and status:
ari_data_uri = os.getenv("ARI_DATA_SYNCING_URI")
# Add URL params to the base URL
sync_url = f"{ari_data_uri}&lead_id={lead_id}&status={status}&comments={comment}"
logger.debug(
f"channelID: {self.caller_channel_id} Syncing data to ARI_DATA_SYNCING_URI: {sync_url}"
)
async with httpx.AsyncClient() as client:
response = await client.post(sync_url, timeout=10.0)
response.raise_for_status()
logger.info(
f"channelID: {self.caller_channel_id} Successfully synced data for lead_id: {lead_id} with status: {status}"
)
else:
logger.warning(
f"channelID: {self.caller_channel_id} Missing lead_id or status for syncing"
)
except Exception as e:
logger.error(
f"channelID: {self.caller_channel_id} Failed to sync data to ARI_DATA_SYNCING_URI: {e}"
)
async def disconnect(self):
"""Instruct Asterisk to hang-up the call and perform cleanup."""
if self._closed:
return
# Lets mark it as closed so that when we receive StasisEnd, we don't
# try to cleanup resource again
self._closed = True
# Clean up resources first
await self._cleanup_resources()
try:
if self.caller_channel_id:
caller_channel = await self._get_channel(self.caller_channel_id)
if caller_channel:
logger.debug(
f"channelID: {self.caller_channel_id} Hanging up caller channel"
)
await caller_channel.hangup()
except Exception:
logger.exception("Failed to hangup caller channel")
async def transfer(self, call_transfer_context: dict):
"""Transfer the call by continuing in dialplan with extracted variables."""
if self._closed:
return
# Lets mark it as closed so that when we receive StasisEnd, we don't
# try to cleanup resource again
self._closed = True
try:
# Clean up resources before transferring
await self._cleanup_resources()
if self.caller_channel_id:
caller_channel = await self._get_channel(self.caller_channel_id)
if caller_channel:
logger.debug(
f"channelID: {self.caller_channel_id} User qualified, continuing in dialplan "
f"REMOTE_DISPO_CALL_VARIABLES: {json.dumps(call_transfer_context)}"
)
# Sync data to ARI_DATA_SYNCING_URI
await self._sync_call_data(
call_transfer_context=call_transfer_context
)
await caller_channel.continueInDialplan()
except Exception:
logger.exception("Failed to transfer caller channel")
async def setup_call(self):
"""Setup the bridge and external media channel.
This must be called after initialization to establish the connection.
"""
await self._setup_call(self._host, self._port)
async def _setup_call(self, host: str, port: int):
"""Create externalMedia + bridge and notify that the call is connected."""
try:
em_channel_id = str(uuid.uuid4())
logger.debug(
f"channelID: {em_channel_id} Creating externalMedia channel on {host}:{port}"
)
client = self._ari
if not client:
raise RuntimeError("No ARI client available")
em_channel = await client.channels.externalMedia(
app=client.app,
channelId=em_channel_id,
external_host=f"{host}:{port}",
format="ulaw",
direction="both",
)
# Store the channel ID
self.em_channel_id = em_channel.id
# Create a mixing bridge and add both legs.
bridge = await client.bridges.create(type="mixing")
self.bridge_id = bridge.id
# Add channels individually as AsyncARIClient expects single channel per call
await bridge.addChannel(channel=self.caller_channel_id)
await bridge.addChannel(channel=self.em_channel_id)
# TODO: Figure out how can we get the remote public IP. Till then
# just pick it from the environment variable
# Get RTP addressing information
# ip = await em_channel.getChannelVar(
# variable="UNICASTRTP_LOCAL_ADDRESS"
# )
port = await em_channel.getChannelVar(variable="UNICASTRTP_LOCAL_PORT")
self.remote_addr = (
os.environ.get("ASTERISK_REMOTE_IP"),
int(port["value"]),
)
logger.debug(
f"channelID: {self.caller_channel_id} ARIManagerConnection connection resources ready "
f"(bridgeID: {self.bridge_id}), (emChannelID: {self.em_channel_id})"
f"remote address: {self.remote_addr}, local address: {self.local_addr}"
)
self._is_connected = True
except Exception as exc:
logger.exception(f"Error setting up ARIManagerConnection: {exc}")
await self._cleanup_resources()
async def notify_channel_end(self):
"""Notify that a channel has ended. Received after we get StasisEnd on the caller channel"""
if self._closed:
return
self._closed = True
self._is_connected = False
# Cleanup resources using the shared method
await self._cleanup_resources()
def __repr__(self):
"""Return string representation of connection."""
return (
f"<ARIManagerConnection id={self.id} caller={self.caller_channel_id} "
f"em={self.em_channel_id} bridge={self.bridge_id} state={'closed' if self._closed else 'open'}>"
)

View file

@ -11,6 +11,7 @@ from loguru import logger
from api.db import db_client
from api.enums import OrganizationConfigurationKey
from api.services.telephony.base import TelephonyProvider
from api.services.telephony.providers.ari_provider import ARIProvider
from api.services.telephony.providers.cloudonix_provider import CloudonixProvider
from api.services.telephony.providers.twilio_provider import TwilioProvider
from api.services.telephony.providers.vobiz_provider import VobizProvider
@ -75,6 +76,15 @@ async def load_telephony_config(organization_id: int) -> Dict[str, Any]:
"domain_id": config.value.get("domain_id"),
"from_numbers": config.value.get("from_numbers", []),
}
elif provider == "ari":
return {
"provider": "ari",
"ari_endpoint": config.value.get("ari_endpoint"),
"app_name": config.value.get("app_name"),
"app_password": config.value.get("app_password"),
"inbound_workflow_id": config.value.get("inbound_workflow_id"),
"from_numbers": config.value.get("from_numbers", []),
}
else:
raise ValueError(f"Unknown provider in config: {provider}")
@ -115,6 +125,9 @@ async def get_telephony_provider(organization_id: int) -> TelephonyProvider:
elif provider_type == "cloudonix":
return CloudonixProvider(config)
elif provider_type == "ari":
return ARIProvider(config)
else:
raise ValueError(f"Unknown telephony provider: {provider_type}")
@ -127,4 +140,10 @@ async def get_all_telephony_providers() -> List[Type[TelephonyProvider]]:
Returns:
List of provider classes that can be used for webhook detection
"""
return [CloudonixProvider, TwilioProvider, VobizProvider, VonageProvider]
return [
ARIProvider,
CloudonixProvider,
TwilioProvider,
VobizProvider,
VonageProvider,
]

View file

@ -0,0 +1,420 @@
"""
Asterisk ARI (Asterisk REST Interface) implementation of the TelephonyProvider interface.
Uses ARI REST API to originate calls into a Stasis application.
The ARI WebSocket event listener runs as a separate process (ari_manager.py).
"""
import json
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from urllib.parse import urlparse
import aiohttp
from fastapi import HTTPException
from loguru import logger
from api.db import db_client
from api.enums import WorkflowRunMode
from api.services.telephony.base import (
CallInitiationResult,
NormalizedInboundData,
TelephonyProvider,
)
if TYPE_CHECKING:
from fastapi import WebSocket
class ARIProvider(TelephonyProvider):
"""
Asterisk ARI implementation of TelephonyProvider.
Uses ARI REST API for call control and relies on a separate
ari_manager process for WebSocket event listening.
"""
PROVIDER_NAME = WorkflowRunMode.ARI.value
WEBHOOK_ENDPOINT = None # ARI uses WebSocket events, not webhooks
def __init__(self, config: Dict[str, Any]):
"""
Initialize ARIProvider with configuration.
Args:
config: Dictionary containing:
- ari_endpoint: ARI base URL (e.g., http://asterisk:8088)
- app_name: Stasis application name
- app_password: ARI user password
- from_numbers: List of SIP extensions/numbers (optional)
"""
self.ari_endpoint = config.get("ari_endpoint", "").rstrip("/")
self.app_name = config.get("app_name", "")
self.app_password = config.get("app_password", "")
self.inbound_workflow_id = config.get("inbound_workflow_id")
self.from_numbers = config.get("from_numbers", [])
if isinstance(self.from_numbers, str):
self.from_numbers = [self.from_numbers]
self.base_url = f"{self.ari_endpoint}/ari"
def _get_auth(self) -> aiohttp.BasicAuth:
"""Generate BasicAuth for ARI API requests."""
return aiohttp.BasicAuth(self.app_name, self.app_password)
async def initiate_call(
self,
to_number: str,
webhook_url: str,
workflow_run_id: Optional[int] = None,
from_number: Optional[str] = None,
**kwargs: Any,
) -> CallInitiationResult:
"""
Initiate an outbound call via ARI.
Creates a channel in Asterisk using the ARI channels endpoint.
The channel is placed into the Stasis application where
the ari_manager will receive the StasisStart event.
"""
if not self.validate_config():
raise ValueError("ARI provider not properly configured")
endpoint = f"{self.base_url}/channels"
# Build the SIP endpoint string
# to_number can be a SIP URI or extension
if to_number.startswith("SIP/") or to_number.startswith("PJSIP/"):
sip_endpoint = to_number
else:
# Default to PJSIP technology
sip_endpoint = f"PJSIP/{to_number}"
# Prepare channel creation data
params = {
"endpoint": sip_endpoint,
"app": self.app_name,
"appArgs": ",".join(
filter(
None,
[
f"workflow_run_id={workflow_run_id}",
f"workflow_id={kwargs.get('workflow_id', '')}",
f"user_id={kwargs.get('user_id', '')}",
],
)
),
}
if from_number:
params["callerId"] = from_number
logger.info(
f"[ARI] Initiating call to {sip_endpoint} "
f"via app={self.app_name}, workflow_run_id={workflow_run_id}"
)
async with aiohttp.ClientSession() as session:
async with session.post(
endpoint,
params=params,
auth=self._get_auth(),
) as response:
response_text = await response.text()
if response.status != 200:
logger.error(
f"[ARI] Channel creation failed: "
f"HTTP {response.status} - {response_text}"
)
raise HTTPException(
status_code=response.status,
detail=f"Failed to create ARI channel: {response_text}",
)
response_data = json.loads(response_text)
channel_id = response_data.get("id", "")
logger.info(
f"[ARI] Channel created: {channel_id} "
f"state={response_data.get('state')}"
)
return CallInitiationResult(
call_id=channel_id,
status=response_data.get("state", "created"),
provider_metadata={
"call_id": channel_id,
"channel_name": response_data.get("name", ""),
},
raw_response=response_data,
)
async def get_call_status(self, call_id: str) -> Dict[str, Any]:
"""Get channel status from ARI."""
if not self.validate_config():
raise ValueError("ARI provider not properly configured")
endpoint = f"{self.base_url}/channels/{call_id}"
async with aiohttp.ClientSession() as session:
async with session.get(endpoint, auth=self._get_auth()) as response:
if response.status != 200:
error_data = await response.text()
raise Exception(f"Failed to get channel status: {error_data}")
return await response.json()
async def get_available_phone_numbers(self) -> List[str]:
"""Return configured extensions/numbers."""
return self.from_numbers
def validate_config(self) -> bool:
"""Validate ARI configuration."""
return bool(self.ari_endpoint and self.app_name and self.app_password)
async def verify_webhook_signature(
self, url: str, params: Dict[str, Any], signature: str
) -> bool:
"""ARI does not use webhook signatures - events come via WebSocket."""
return True
async def get_webhook_response(
self, workflow_id: int, user_id: int, workflow_run_id: int
) -> str:
"""ARI does not use webhook responses - call control is via REST API."""
logger.warning(
"get_webhook_response called for ARI - this should not happen. "
"ARI uses REST API for call control, not webhooks."
)
return ""
async def get_call_cost(self, call_id: str) -> Dict[str, Any]:
"""ARI/Asterisk does not provide call cost information."""
return {
"cost_usd": 0.0,
"duration": 0,
"status": "unknown",
"error": "ARI does not support cost retrieval",
}
def parse_status_callback(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Parse ARI event data into generic status callback format.
ARI events come from the WebSocket listener, not HTTP callbacks.
"""
# Map ARI channel states to common status format
state_map = {
"Up": "answered",
"Down": "completed",
"Ringing": "ringing",
"Ring": "ringing",
"Busy": "busy",
"Unavailable": "failed",
}
channel_state = data.get("channel", {}).get("state", "")
event_type = data.get("type", "")
# Determine status from event type
if event_type == "StasisStart":
status = "answered"
elif event_type == "StasisEnd":
status = "completed"
elif event_type == "ChannelDestroyed":
status = "completed"
else:
status = state_map.get(channel_state, channel_state.lower())
channel = data.get("channel", {})
return {
"call_id": channel.get("id", ""),
"status": status,
"from_number": channel.get("caller", {}).get("number"),
"to_number": channel.get("dialplan", {}).get("exten"),
"direction": None,
"duration": None,
"extra": data,
}
async def handle_websocket(
self,
websocket: "WebSocket",
workflow_id: int,
user_id: int,
workflow_run_id: int,
) -> None:
"""
Handle WebSocket connection from ARI externalMedia channel.
Unlike Twilio (which sends "connected" and "start" JSON messages),
Asterisk chan_websocket starts streaming audio immediately.
"""
from api.services.pipecat.run_pipeline import run_pipeline_ari
# Get channel_id from workflow run context
workflow_run = await db_client.get_workflow_run(workflow_run_id, user_id)
channel_id = ""
if workflow_run and workflow_run.gathered_context:
channel_id = workflow_run.gathered_context.get("call_id", "")
logger.info(
f"[ARI] Starting pipeline for workflow_run {workflow_run_id}, channel={channel_id}"
)
await run_pipeline_ari(
websocket, channel_id, workflow_id, workflow_run_id, user_id
)
# ======== INBOUND CALL METHODS ========
@classmethod
def can_handle_webhook(
cls, webhook_data: Dict[str, Any], headers: Dict[str, str]
) -> bool:
"""
ARI does not use HTTP webhooks for inbound calls.
Inbound calls are received via the ARI WebSocket event listener.
"""
return False
@staticmethod
def parse_inbound_webhook(webhook_data: Dict[str, Any]) -> NormalizedInboundData:
"""Parse ARI event data into normalized inbound format."""
channel = webhook_data.get("channel", {})
caller = channel.get("caller", {})
connected = channel.get("connected", {})
return NormalizedInboundData(
provider=ARIProvider.PROVIDER_NAME,
call_id=channel.get("id", ""),
from_number=caller.get("number", ""),
to_number=channel.get("dialplan", {}).get("exten", ""),
direction="inbound",
call_status=channel.get("state", ""),
account_id=None,
raw_data=webhook_data,
)
@staticmethod
def validate_account_id(config_data: dict, webhook_account_id: str) -> bool:
"""ARI doesn't use account IDs for validation."""
return True
def normalize_phone_number(self, phone_number: str) -> str:
"""Normalize phone number - ARI uses extensions as-is."""
return phone_number or ""
async def verify_inbound_signature(
self, url: str, webhook_data: Dict[str, Any], signature: str
) -> bool:
"""ARI authenticates via WebSocket connection credentials, not signatures."""
return True
@staticmethod
async def generate_inbound_response(
websocket_url: str, workflow_run_id: int = None
) -> tuple:
"""ARI does not generate HTTP responses for inbound calls."""
from fastapi import Response
return Response(content="", status_code=204)
@staticmethod
def generate_error_response(error_type: str, message: str) -> tuple:
"""Generate a generic JSON error response."""
from fastapi import Response
return Response(
content=json.dumps({"error": error_type, "message": message}),
media_type="application/json",
)
@staticmethod
def generate_validation_error_response(error_type) -> tuple:
"""Generate JSON error response for validation failures."""
from fastapi import Response
from api.errors.telephony_errors import TELEPHONY_ERROR_MESSAGES, TelephonyError
message = TELEPHONY_ERROR_MESSAGES.get(
error_type, TELEPHONY_ERROR_MESSAGES[TelephonyError.GENERAL_AUTH_FAILED]
)
return Response(
content=json.dumps({"error": str(error_type), "message": message}),
media_type="application/json",
)
# ======== CALL TRANSFER METHODS ========
def supports_transfers(self) -> bool:
"""ARI does not currently support call transfers."""
return False
async def transfer_call(
self,
destination: str,
transfer_id: str,
conference_name: str,
timeout: int = 30,
**kwargs: Any,
) -> Dict[str, Any]:
"""ARI call transfers are not yet implemented."""
raise NotImplementedError("ARI provider does not support call transfers")
# ======== ARI-SPECIFIC METHODS ========
async def hangup_channel(self, channel_id: str, reason: str = "normal") -> bool:
"""Hang up an ARI channel."""
endpoint = f"{self.base_url}/channels/{channel_id}"
params = {"reason_code": reason}
try:
async with aiohttp.ClientSession() as session:
async with session.delete(
endpoint, params=params, auth=self._get_auth()
) as response:
if response.status in (200, 204):
logger.info(f"[ARI] Channel {channel_id} hung up")
return True
else:
error = await response.text()
logger.error(
f"[ARI] Failed to hangup channel {channel_id}: {error}"
)
return False
except Exception as e:
logger.error(f"[ARI] Exception hanging up channel {channel_id}: {e}")
return False
async def answer_channel(self, channel_id: str) -> bool:
"""Answer an ARI channel."""
endpoint = f"{self.base_url}/channels/{channel_id}/answer"
try:
async with aiohttp.ClientSession() as session:
async with session.post(endpoint, auth=self._get_auth()) as response:
if response.status in (200, 204):
logger.info(f"[ARI] Channel {channel_id} answered")
return True
else:
error = await response.text()
logger.error(
f"[ARI] Failed to answer channel {channel_id}: {error}"
)
return False
except Exception as e:
logger.error(f"[ARI] Exception answering channel {channel_id}: {e}")
return False
def get_ws_url(self) -> str:
"""Get the ARI WebSocket URL for event listening."""
parsed = urlparse(self.ari_endpoint)
ws_scheme = "wss" if parsed.scheme == "https" else "ws"
return (
f"{ws_scheme}://{parsed.netloc}/ari/events"
f"?api_key={self.app_name}:{self.app_password}"
f"&app={self.app_name}"
f"&subscribeAll=true"
)

View file

@ -1,184 +0,0 @@
"""Redis communication protocol for distributed ARI architecture.
Defines message formats and helpers for ARI Manager <-> Worker communication.
"""
import json
from dataclasses import asdict, dataclass
from enum import Enum
from typing import Any, Dict, List, Optional
class EventType(str, Enum):
"""Types of events sent from ARI Manager to Workers."""
STASIS_START = "stasis_start"
STASIS_END = "stasis_end"
CHANNEL_UPDATE = "channel_update"
ERROR = "error"
class CommandType(str, Enum):
"""Types of commands sent from Workers to ARI Manager."""
DISCONNECT = "disconnect"
TRANSFER = "transfer"
UPDATE_STATE = "update_state"
SOCKET_CLOSED = "socket_closed"
@dataclass
class BaseWorkerToARIManagerCommand:
"""Base class for all commands sent from Workers to ARI Manager."""
type: str
channel_id: str = ""
def to_json(self) -> str:
return json.dumps(asdict(self))
@classmethod
def from_json(cls, data: str):
return cls(**json.loads(data))
@dataclass
class StasisStartEvent:
"""Event sent when a new call is bridged and ready."""
type: str = EventType.STASIS_START
channel_id: str = ""
caller_channel_id: str = ""
em_channel_id: Optional[str] = None
bridge_id: Optional[str] = None
local_addr: List[Any] = None # [host, port]
remote_addr: Optional[List[Any]] = None # [host, port] with UNICASTRTP_LOCAL_PORT
call_context_vars: Dict[str, Any] = None
def __post_init__(self):
if self.local_addr is None:
self.local_addr = []
if self.call_context_vars is None:
self.call_context_vars = {}
def to_json(self) -> str:
return json.dumps(asdict(self))
@classmethod
def from_json(cls, data: str) -> "StasisStartEvent":
return cls(**json.loads(data))
@dataclass
class StasisEndEvent:
"""Event sent when a call ends."""
type: str = EventType.STASIS_END
channel_id: str = ""
reason: Optional[str] = None
def to_json(self) -> str:
return json.dumps(asdict(self))
@classmethod
def from_json(cls, data: str) -> "StasisEndEvent":
return cls(**json.loads(data))
@dataclass
class DisconnectCommand(BaseWorkerToARIManagerCommand):
"""Command to disconnect a call."""
type: str = CommandType.DISCONNECT
reason: str = "worker_requested"
@dataclass
class TransferCommand(BaseWorkerToARIManagerCommand):
"""Command to transfer a call."""
type: str = CommandType.TRANSFER
context: Dict[str, Any] = None
def __post_init__(self):
if self.context is None:
self.context = {}
@dataclass
class SocketClosedCommand(BaseWorkerToARIManagerCommand):
"""Command to notify that RTP sockets have been closed."""
type: str = CommandType.SOCKET_CLOSED
class RedisChannels:
"""Redis channel naming conventions."""
@staticmethod
def worker_events(worker_id: str) -> str:
"""Channel for events sent to a specific worker."""
return f"ari:events:worker:{worker_id}"
@staticmethod
def channel_commands(channel_id: str) -> str:
"""Channel for commands related to a specific call channel."""
return f"ari:commands:{channel_id}"
@staticmethod
def channel_updates(channel_id: str) -> str:
"""Channel for state updates about a specific call."""
return f"ari:updates:{channel_id}"
class RedisKeys:
"""Redis key naming conventions for worker registration and discovery."""
@staticmethod
def worker_active(worker_id: str) -> str:
"""Key for active worker status and metadata."""
return f"workers:active:{worker_id}"
@staticmethod
def workers_set() -> str:
"""Set containing all registered worker IDs."""
return "workers:set"
@staticmethod
def round_robin_index() -> str:
"""Counter for round-robin worker selection."""
return "workers:round_robin:index"
def parse_event(data: str) -> Any:
"""Parse a Redis event message."""
try:
parsed = json.loads(data)
event_type = parsed.get("type")
if event_type == EventType.STASIS_START:
return StasisStartEvent(**parsed)
elif event_type == EventType.STASIS_END:
return StasisEndEvent(**parsed)
else:
return parsed
except Exception:
return None
def parse_command(data: str) -> Any:
"""Parse a Redis command message."""
try:
parsed = json.loads(data)
cmd_type = parsed.get("type")
if cmd_type == CommandType.DISCONNECT:
return DisconnectCommand(**parsed)
elif cmd_type == CommandType.TRANSFER:
return TransferCommand(**parsed)
elif cmd_type == CommandType.SOCKET_CLOSED:
return SocketClosedCommand(**parsed)
else:
return parsed
except Exception:
return None

View file

@ -1,315 +0,0 @@
"""Low-level RTP transport for Asterisk externalMedia sessions.
stasis_rtp_client.py
~~~~~~~~~~~~~~~~~~~~
* Sends and receives **proper RTP/UDP** (PT 0 PCMU/μ-law).
* Uses 20 ms frames (160 bytes payload) by default; automatically
chunks or concatenates data so timestamps stay correct.
* Verifies the RTP header on the receive path (SSRC and PT).
"""
import asyncio
import secrets
import socket
import struct
from typing import TYPE_CHECKING, AsyncIterator, Optional
from loguru import logger
if TYPE_CHECKING:
from api.services.telephony.stasis_rtp_connection import StasisRTPConnection
from api.services.telephony.stasis_rtp_transport import StasisRTPCallbacks
# ─────────────────────────────────────────────────────────────────── helpers
_RTP_HDR = struct.Struct("!BBHII") # v/p/x/cc, m/pt, seq, ts, ssrc
_PT_PCMU = 0 # static payload type for μ-law
class _RTPEncoder:
"""Builds PCMU RTP headers for the packets we SEND to Asterisk."""
def __init__(self):
self.ssrc = secrets.randbits(32)
self.seq = secrets.randbits(16)
self.ts = 0 # incremented by #payload bytes
def pack(self, payload: bytes, mark=False) -> bytes:
b0 = 0x80 # V=2
b1 = (0x80 if mark else 0x00) | _PT_PCMU
hdr = _RTP_HDR.pack(b0, b1, self.seq, self.ts, self.ssrc)
self.seq = (self.seq + 1) & 0xFFFF
self.ts += len(payload) # 1 sample/byte @ 8 kHz
return hdr + payload
class _RTPDecoder:
"""Very forgiving RTP decoder.
Latches on the first valid packet and then insists
that SSRC & PT match afterwards. Returns *None* if the packet
should be ignored.
"""
def __init__(self):
self.peer_ssrc: int | None = None # learned from first packet
def unpack(self, packet: bytes) -> bytes | None:
if len(packet) < _RTP_HDR.size:
return None
b0, b1, seq, ts, ssrc = _RTP_HDR.unpack_from(packet)
if (b0 & 0xC0) != 0x80: # RTP v2?
return None
if (b1 & 0x7F) != _PT_PCMU: # payload-type 0?
return None
if self.peer_ssrc is None:
self.peer_ssrc = ssrc # latch on first good packet
elif ssrc != self.peer_ssrc:
return None # stray stream drop
return packet[_RTP_HDR.size :]
# ──────────────────────────────────────────────────────────────── client
class StasisRTPClient:
"""Low-level wrapper around StasisRTPConnection.
Public API
await setup(start_frame) kept for parity (does nothing)
await connect()
async for payload in receive(): # μ-law bytes (20 ms each)
await send(data) # any length; will be chunked
await disconnect()
"""
_FRAME_SIZE = 160 # 20 ms @ 8 kHz PCMU
def __init__(
self,
connection: "StasisRTPConnection",
callbacks: "StasisRTPCallbacks",
):
"""Initialize Stasis RTP client.
Args:
connection: RTP connection parameters.
callbacks: Callback handlers for transport events.
"""
from typing import Any
self._connection = connection
self._callbacks = callbacks
self._encoder = _RTPEncoder()
self._decoder = _RTPDecoder()
self._recv_sock: Optional[socket.socket] = None
self._send_sock: Optional[socket.socket] = None
self._closing = False
self._recv_sock_ready = asyncio.Event() # Signal when recv socket is ready
self._leave_counter = 0 # Track input/output transport usage
# ── wire event handlers to the connection ────────────────
@self._connection.event_handler("connected")
async def _on_connected(_: Any):
await self._setup_sockets()
await self._callbacks.on_client_connected(
self._connection.caller_channel_id
)
@self._connection.event_handler("disconnected")
async def _on_disconnected(_: Any):
logger.debug("In _on_disconnected of StasisRTPClient")
await self._callbacks.on_client_disconnected(
self._connection.caller_channel_id
)
# ─── public helpers ──────────────────────────────────────────
async def setup(self, _):
"""Setup method for compatibility."""
self._leave_counter += 1
async def connect(self):
"""Connect to the RTP socket."""
if self._connection.is_connected():
return
await self._connection.connect()
async def disconnect(self):
"""Disconnect from the RTP socket."""
# Decrement leave counter when disconnect is called
logger.debug(f"StasisRTPClient.disconnect leave_counter: {self._leave_counter}")
self._leave_counter -= 1
if self._leave_counter > 0:
# Early return - InputTransport called first, OutputTransport will call later
# Only proceed when counter reaches 0 (OutputTransport's call)
return
# Close sockets
logger.debug("Going to close sockets")
await self._close_sockets()
if self._closing:
# We might have received the disconnected callback from the StasisRTPConnection
# due to user hangup. We will just return. We have already closed the sockets
# in disconnected callback handler.
return
self._closing = True
# If we have initiated transfer before, we would ignore _connection.disconnect()
# in the connection. (since is_closing would be set by transfer)
try:
await self._connection.disconnect()
except Exception as exc:
logger.error(f"Failed to disconnect RTP connection: {exc}")
# ─── socket management ──────────────────────────────────────
async def _setup_sockets(self):
if self._recv_sock and self._send_sock:
return
logger.debug(
f"Setting up Sockets - local {self._connection.local_addr}, remote: {self._connection.remote_addr}"
)
# receive socket bind to local address provided by connection
if not self._recv_sock:
rs = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
rs.setblocking(False)
rs.bind(self._connection.local_addr)
self._recv_sock = rs
self._recv_sock_ready.set() # Signal that recv socket is ready
# send socket connect to remote (Asterisk) address
if not self._send_sock:
ss = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
ss.setblocking(False)
ss.connect(self._connection.remote_addr)
self._send_sock = ss
logger.debug(
f"Socket setup complete - recv_fd: {self._recv_sock.fileno()}, send_fd: {self._send_sock.fileno()}"
)
async def _close_sockets(self):
"""Safely close sockets with proper error handling."""
for sock_name, sock in [("recv", self._recv_sock), ("send", self._send_sock)]:
if sock:
try:
# Shutdown the socket first to break any pending operations
sock.shutdown(socket.SHUT_RDWR)
except OSError:
# Socket might already be closed or in a bad state
pass
try:
sock.close()
except Exception as exc:
logger.debug(f"Error closing {sock_name} socket: {exc}")
self._recv_sock = None
self._send_sock = None
self._recv_sock_ready.clear() # Reset the event for potential reconnection
# Notify the connection that sockets are closed so ARI Manager can clean up ports
await self._connection.notify_sockets_closed()
logger.debug("Closed sockets in StasisRTPClient")
# ─── receive path ────────────────────────────────────────────
async def receive(self) -> AsyncIterator[bytes]:
"""Async generator yielding μ-law frames (exactly 160 bytes each).
Silently drops any packet whose RTP header does not match our SSRC/PT.
"""
loop = asyncio.get_running_loop()
# Wait for recv socket to be created
try:
await self._recv_sock_ready.wait()
except asyncio.CancelledError:
return
logger.debug("Going to receive from the socket now")
while not self._closing:
try:
# each loop gets 172 bytes UDP packet, which is 160 bytes of
# audio data (Asterisk sends 20ms audio chunks with 8k sample rate)
# and 12 bytes of RTP header
data = await loop.sock_recv(self._recv_sock, 2048)
except asyncio.CancelledError:
logger.debug("RTP receive task cancelled")
break
except (OSError, socket.error) as exc:
logger.warning(f"RTP receive failed (socket closed): {exc}")
break
except Exception as exc:
logger.debug(f"Unexpected error in receive: {exc}")
break
payload = self._decoder.unpack(data)
if payload is None:
continue # header failed validation
# In practice Asterisk sends 20 ms frames assert just in case.
if len(payload) != self._FRAME_SIZE:
logger.warning(f"Dropping non-20 ms packet len={len(payload)}")
continue
yield payload
# ─── send path ───────────────────────────────────────────────
async def send(self, data: bytes):
"""Send μ-law data of arbitrary length.
Splits/aggregates into 160-byte chunks before RTP-wrapping.
"""
if self._closing or not self._send_sock:
return
loop = asyncio.get_running_loop()
# chunk/concat to 160-byte frames
chunks = self._chunk_ulaw(data, self._FRAME_SIZE)
for i, chunk in enumerate(chunks):
mark = i == 0 # set marker on the first packet of talk-spurt
packet = self._encoder.pack(chunk, mark=mark)
try:
await loop.sock_sendall(self._send_sock, packet)
except (OSError, socket.error) as exc:
logger.warning(f"RTP send failed (socket closed): {exc}")
break
except Exception as exc:
logger.error(f"RTP send failed: {exc}")
break
def _chunk_ulaw(self, buf: bytes, size: int) -> list[bytes]:
"""Split / aggregate μ-law bytes to exact *size* multiples.
If buf length is not a multiple of *size*, pad the last chunk with 0xFF
(silence). That keeps timestamps monotonic.
"""
if not buf:
return []
if len(buf) % size:
pad = size - (len(buf) % size)
buf += b"\xff" * pad
return [buf[i : i + size] for i in range(0, len(buf), size)]
# ─── properties ──────────────────────────────────────────────
@property
def is_connected(self) -> bool:
"""Check if client is connected."""
return self._connection.is_connected() and not self._closing
@property
def is_closing(self) -> bool:
"""Check if client is closing."""
return self._closing

View file

@ -1,191 +0,0 @@
"""Stasis RTP connection for worker processes - is used by stasis rtp transport.
This connection works without direct ARI access and communicates with
the ARI Manager via Redis for all control operations.
"""
from typing import Optional, Tuple
import redis.asyncio as aioredis
from loguru import logger
from api.services.telephony.stasis_event_protocol import (
DisconnectCommand,
RedisChannels,
SocketClosedCommand,
TransferCommand,
)
from pipecat.utils.base_object import BaseObject
class StasisRTPConnection(BaseObject):
"""Worker-side connection that communicates with ARI Manager via Redis.
This class provides the same API as the original StasisRTPConnection but
without direct ARI client access. All channel operations are delegated
to the ARI Manager process via Redis.
"""
_SUPPORTED_EVENTS = [
"connecting",
"connected",
"disconnected",
"closed",
"failed",
"new",
]
def __init__(
self,
redis_client: aioredis.Redis,
channel_id: str,
caller_channel_id: str,
em_channel_id: Optional[str],
bridge_id: Optional[str],
local_addr: Optional[Tuple[str, int]],
remote_addr: Optional[Tuple[str, int]],
workflow_run_id: Optional[int] = None,
):
"""Initialize distributed connection with pre-established details.
Args:
redis_client: Redis client for communication
channel_id: Primary channel ID for this connection
caller_channel_id: Caller's channel ID
em_channel_id: External media channel ID
bridge_id: Bridge ID (already created by ARI Manager)
local_addr: Local RTP address (host, port)
remote_addr: Remote RTP address with UNICASTRTP_LOCAL_PORT
workflow_run_id: Workflow run ID for logging context
"""
super().__init__()
self.redis = redis_client
self.channel_id = channel_id
self.caller_channel_id = caller_channel_id
self.em_channel_id = em_channel_id
self.bridge_id = bridge_id
self.workflow_run_id = workflow_run_id
# RTP addressing (same as StasisRTPConnection)
self.local_addr = local_addr
self.remote_addr = remote_addr
# State tracking
# self._closed_by_stasis_end should only be set True after we get
# StasisEnd from the transport
self._closed_by_stasis_end = False
# self._closing should be True if we have received disconnect
# or transfer request
self._closing = False
self._connect_invoked = False
# Register event handlers
for evt in self._SUPPORTED_EVENTS:
self._register_event_handler(evt)
logger.debug(
f"channelID: {channel_id} StasisRTPConnection created: "
f"bridgeID: {bridge_id}, local_addr={local_addr}, remote_addr={remote_addr}"
)
async def connect(self):
"""Signal readiness to start the call.
Since the bridge is already established by ARI Manager,
we can immediately trigger the connected event.
"""
self._connect_invoked = True
if self.is_connected():
await self._call_event_handler("connected")
else:
logger.warning(
"StasisRTPConnection is not connected - did not call connected handler"
)
async def disconnect(self):
"""Request disconnection via Redis command to ARI Manager. Usually called
when there is a disconnect triggered by workflow"""
# If we have already received user hangup via StasisEnd, lets
# return
if self._closed_by_stasis_end or self._closing:
return
self._closing = True
logger.info(f"channelID: {self.channel_id} Requesting disconnect")
# Send disconnect command to ARI Manager
command = DisconnectCommand(channel_id=self.channel_id)
channel = RedisChannels.channel_commands(self.channel_id)
await self.redis.publish(channel, command.to_json())
async def transfer(self, call_transfer_context: dict):
"""Request call transfer via Redis command to ARI Manager."""
# If we have already received user hangup via StasisEnd, lets
# return
if self._closed_by_stasis_end or self._closing:
return
self._closing = True
logger.info(f"channelID: {self.channel_id} Requesting transfer")
# Send transfer command to ARI Manager
command = TransferCommand(
channel_id=self.channel_id, context=call_transfer_context
)
channel = RedisChannels.channel_commands(self.channel_id)
await self.redis.publish(channel, command.to_json())
async def notify_sockets_closed(self):
"""Notify ARI Manager that RTP sockets have been closed."""
logger.info(
f"channelID: {self.channel_id} Notifying ARI Manager that sockets are closed"
)
# Send socket_closed command to ARI Manager
command = SocketClosedCommand(channel_id=self.channel_id)
channel = RedisChannels.channel_commands(self.channel_id)
await self.redis.publish(channel, command.to_json())
def is_connected(self) -> bool:
"""Check if connection is established.
Returns True once connect() has been called and connection is not closed.
"""
return (
self._connect_invoked
and not self._closed_by_stasis_end
and not self._closing
)
async def handle_remote_disconnect(self):
"""Handle disconnection initiated by ARI Manager. Is called when the user hangs up."""
if self._closed_by_stasis_end or self._closing:
return
self._closed_by_stasis_end = True
if self._connect_invoked:
# Unless self._connect_invoked is True, the event handlers won't be registered. We only
# register the event handler of client when the transports are initiated during pipeline
# initialisation. Any caller must check and wait for _connect_invoked before
# calling the method
await self._call_event_handler("disconnected")
else:
logger.warning(
f"ChannelID: {self.channel_id} Got remote disconnect before connection was invoked"
)
logger.info(f"channelID: {self.channel_id} StasisRTPConnection disconnected")
def __repr__(self):
"""String representation of connection."""
return (
f"<StasisRTPConnection id={self.id} channel={self.channel_id} "
f"caller={self.caller_channel_id} em={self.em_channel_id} "
f"state={'closed' if self._closed_by_stasis_end else 'open'}>"
)

View file

@ -1,116 +0,0 @@
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
"""Stasis RTP frame serializer.
This serializer converts between Pipecat frames and the raw μ-law RTP payload
stream expected by an Stasis *External Media* channel.
The serializer:
* Down-samples PCM to 8-kHz μ-law for **outgoing** audio (:class:`AudioRawFrame`).
* Up-samples μ-law to the pipeline's native rate for **incoming** audio.
"""
from typing import Optional
from loguru import logger
from pydantic import BaseModel
from pipecat.audio.utils import create_default_resampler, pcm_to_ulaw, ulaw_to_pcm
from pipecat.frames.frames import (
AudioRawFrame,
Frame,
InputAudioRawFrame,
StartFrame,
)
from pipecat.serializers.base_serializer import FrameSerializer
class StasisRTPFrameSerializer(FrameSerializer):
"""Serializer for Asterisk External Media streams (raw μ-law)."""
class InputParams(BaseModel):
"""Configuration parameters.
Attributes:
----------
stasis_sample_rate : int, default 8000
The sample-rate used by Stasis when sending μ-law (PCMU).
sample_rate : Optional[int]
Override for the pipeline's *input* sample-rate. When omitted the
value from the :class:`StartFrame` is used.
"""
stasis_sample_rate: int = 8000
sample_rate: Optional[int] = None
def __init__(self, params: Optional[InputParams] = None):
"""Initialize Stasis RTP frame serializer.
Args:
params: Optional configuration parameters for the serializer.
"""
self._params = params or self.InputParams()
# Wire / pipeline rates
self._stasis_sample_rate = self._params.stasis_sample_rate
self._sample_rate = 0 # pipeline rate, filled in *setup*
# Resampler shared between encode / decode paths
self._resampler = create_default_resampler()
async def setup(self, frame: StartFrame):
"""Remember pipeline configuration."""
self._sample_rate = self._params.sample_rate or frame.audio_in_sample_rate
async def serialize(self, frame: Frame) -> bytes | str | None:
"""Convert a Pipecat frame to a wire payload.
Only :class:`AudioRawFrame` instances are translated all other frame
types are silently ignored, allowing higher-level transports to deal
with them as needed.
"""
if isinstance(frame, AudioRawFrame):
try:
# Pipeline PCM → 8-kHz μ-law
encoded = await pcm_to_ulaw(
frame.audio,
frame.sample_rate,
self._stasis_sample_rate,
self._resampler,
)
return encoded # raw bytes
except Exception as exc: # pragma: no cover robustness
logger.error(
f"StasisRTPFrameSerializer.serialize: encode failed: {exc}"
)
return None
# Non-audio frames are not transmitted on the media path
return None
async def deserialize(self, data: bytes | str) -> Frame | None:
"""Convert wire payloads to Pipecat frames.
The Stasis media socket delivers bare μ-law bytes, therefore *data*
must be *bytes*. Any *str* is ignored.
"""
if not isinstance(data, (bytes, bytearray)):
return None
try:
pcm = await ulaw_to_pcm(
bytes(data),
self._stasis_sample_rate,
self._sample_rate,
self._resampler,
)
return InputAudioRawFrame(
audio=pcm,
sample_rate=self._sample_rate,
num_channels=1,
)
except Exception as exc: # pragma: no cover
logger.error(f"StasisRTPFrameSerializer.deserialize: decode failed: {exc}")
return None

View file

@ -1,300 +0,0 @@
# transports/ari_external_media.py (new file)
"""Stasis RTP transport for Asterisk External Media integration."""
import asyncio
import time
from typing import Awaitable, Callable, Optional
from loguru import logger
from pydantic import BaseModel
from api.services.telephony.stasis_rtp_client import StasisRTPClient
from api.services.telephony.stasis_rtp_connection import StasisRTPConnection
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
InputAudioRawFrame,
OutputAudioRawFrame,
StartFrame,
TransportMessageFrame,
TransportMessageUrgentFrame,
)
from pipecat.serializers.base_serializer import FrameSerializer
from pipecat.transports.base_input import BaseInputTransport
from pipecat.transports.base_output import BaseOutputTransport
from pipecat.transports.base_transport import BaseTransport, TransportParams
class StasisRTPTransportParams(TransportParams):
"""Transport parameters for Stasis RTP transport."""
serializer: FrameSerializer
class StasisRTPCallbacks(BaseModel):
"""Callbacks for Stasis RTP transport events."""
on_client_connected: Callable[[str], Awaitable[None]]
on_client_disconnected: Callable[[str], Awaitable[None]]
on_client_closed: Callable[[str], Awaitable[None]]
# ------------------------------------------------ Input Transport -------------------------
"""
Transport calls client receive to receive the audio from the socket. This happens in the self._receive_audio task.
Then the audio frames are pushed to _audio_in_queue using push_audio_frame method. Then the _audio_task_handler processes
the frames from the _audio_in_queue and pushes them to the VAD analyzer, turn analyzer and pushes the audio
further downstream to tts.
The BaseInputTransport pipeline is responsible for:
- Resampling the audio to the correct sample rate
- Applying the audio filter
- Pushing the audio frames to the VAD analyzer
- Pushing the audio frames to the turn analyzer
- Pushing the audio frames to the bot interruption analyzer
- Pushing the audio frames down the pipeline to the tts
stop method is called from process_frame of the BaseInputTransport. super.stop() stops _audio_task_handler. It then
calls _client.disconnect. Transport's callbacks are sent to the client using StasisRTPCallbacks.
"""
class StasisRTPInputTransport(BaseInputTransport):
"""Input transport for receiving audio over Stasis RTP."""
def __init__(
self,
transport: BaseTransport,
client: StasisRTPClient,
params: StasisRTPTransportParams,
**kwargs,
):
"""Initialize Stasis RTP input transport.
Args:
transport: Parent transport instance.
client: Stasis RTP client for socket communication.
params: Transport parameters including serializer.
**kwargs: Additional keyword arguments for BaseInputTransport.
"""
super().__init__(params, **kwargs)
self._transport = transport
self._client = client
self._params = params
self._receive_task: Optional[asyncio.Task] = None
async def start(self, frame: StartFrame):
"""Start the input transport."""
await super().start(frame)
await self._client.setup(frame)
await self._params.serializer.setup(frame)
# Ensure underlying connection is established and socket ready.
await self._client.connect()
if not self._receive_task:
self._receive_task = self.create_task(self._receive_audio())
await self.set_transport_ready(frame)
async def _stop_tasks(self):
if self._receive_task:
await self.cancel_task(self._receive_task)
self._receive_task = None
async def stop(self, frame: EndFrame):
"""Stop the input transport."""
await super().stop(frame)
await self._stop_tasks()
await self._client.disconnect()
logger.debug("Successfully disconnected from StasisRTPClient")
async def cancel(self, frame: CancelFrame):
"""Cancel the input transport."""
await super().cancel(frame)
await self._stop_tasks()
await self._client.disconnect()
async def _receive_audio(self):
try:
async for payload in self._client.receive():
frame = await self._params.serializer.deserialize(payload)
if not frame:
continue
if isinstance(frame, InputAudioRawFrame):
await self.push_audio_frame(frame)
else:
await self.push_frame(frame)
except Exception as exc:
logger.error(f"StasisRTPInputTransport receive error: {exc}")
# No app-messages in RTP path, but keep compatibility
async def push_app_message(self, message):
"""Push app message (not supported in RTP transport)."""
logger.debug("StasisRTPInputTransport received app message ignored (RTP only)")
# ------------------------------------------------ Output Transport ------------------------
class StasisRTPOutputTransport(BaseOutputTransport):
"""Output transport for sending audio over Stasis RTP."""
def __init__(
self,
transport: BaseTransport,
client: StasisRTPClient,
params: StasisRTPTransportParams,
**kwargs,
):
"""Initialize Stasis RTP output transport.
Args:
transport: Parent transport instance.
client: Stasis RTP client for socket communication.
params: Transport parameters including serializer.
**kwargs: Additional keyword arguments for BaseOutputTransport.
"""
super().__init__(params, **kwargs)
self._transport = transport
self._client = client
self._params = params
# Pace outgoing audio so we don't dump buffers instantly (simulate 10-ms chunks)
self._send_interval: float = 0
self._next_send_time: float = 0
async def start(self, frame: StartFrame):
"""Start the output transport."""
await super().start(frame)
await self._client.setup(frame)
await self._params.serializer.setup(frame)
self._send_interval = self._params.audio_out_10ms_chunks * 10 / 1000 # ms
await self.set_transport_ready(frame)
async def stop(self, frame: EndFrame):
"""Stop the output transport."""
await super().stop(frame)
await self._client.disconnect()
async def cancel(self, frame: CancelFrame):
"""Cancel the output transport."""
await super().cancel(frame)
await self._client.disconnect()
async def send_message(
self, frame: TransportMessageFrame | TransportMessageUrgentFrame
):
"""Send message frame (not supported in RTP transport)."""
# RTP path has no generic message channel; ignore.
pass
async def write_audio_frame(self, frame: OutputAudioRawFrame):
"""Write audio frame to RTP stream."""
if self._client.is_closing:
return False
if not self._client.is_connected:
# If not connected yet, just simulate playback delay.
await self._write_audio_sleep()
return
payload = await self._params.serializer.serialize(frame)
if payload:
await self._client.send(payload)
await self._write_audio_sleep()
async def _write_audio_sleep(self):
"""Simulates real-time audio playback timing by introducing controlled delays.
This method implements a clock simulation to pace audio transmission at realistic
intervals. Without this pacing, audio frames would be sent as fast as possible,
which could overwhelm receivers or cause buffering issues.
The method:
1. Calculates how long to sleep based on when the next frame should be sent
2. Sleeps for the calculated duration (or 0 if we're already behind schedule)
3. Updates _next_send_time for the next audio chunk
The _send_interval is computed as: (audio_chunk_size / sample_rate) / 2
This creates timing that simulates how an actual audio device would output
audio at the proper rate (e.g., every 10ms for 10ms audio chunks).
"""
current_time = time.monotonic()
sleep_duration = max(0, self._next_send_time - current_time)
await asyncio.sleep(sleep_duration)
if sleep_duration == 0:
self._next_send_time = time.monotonic() + self._send_interval
else:
self._next_send_time += self._send_interval
class StasisRTPTransport(BaseTransport):
"""Main transport class for Stasis RTP communication."""
def __init__(
self,
stasis_connection: StasisRTPConnection,
params: StasisRTPTransportParams,
input_name: Optional[str] = None,
output_name: Optional[str] = None,
):
"""Initialize Stasis RTP transport.
Args:
stasis_connection: Connection parameters for Stasis RTP.
params: Transport parameters including serializer.
input_name: Optional name for input transport.
output_name: Optional name for output transport.
"""
super().__init__(input_name=input_name, output_name=output_name)
self._params = params
client_callbacks = StasisRTPCallbacks(
on_client_connected=self._on_client_connected,
on_client_disconnected=self._on_client_disconnected,
on_client_closed=self._on_client_closed,
)
self._client = StasisRTPClient(stasis_connection, client_callbacks)
self._input = StasisRTPInputTransport(
self, self._client, self._params, name=self._input_name
)
self._output = StasisRTPOutputTransport(
self, self._client, self._params, name=self._output_name
)
# expose handlers
self._register_event_handler("on_client_connected")
self._register_event_handler("on_client_disconnected")
self._register_event_handler("on_client_closed")
def input(self) -> StasisRTPInputTransport:
"""Get the input transport."""
return self._input
def output(self) -> StasisRTPOutputTransport:
"""Get the output transport."""
return self._output
# ------------------------------------------------ event adapters ----------
async def _on_client_connected(self, chan_id: str):
await self._call_event_handler("on_client_connected", chan_id)
async def _on_client_disconnected(self, chan_id: str):
await self._call_event_handler("on_client_disconnected", chan_id)
async def _on_client_closed(self, chan_id: str):
await self._call_event_handler("on_client_closed", chan_id)

View file

@ -1,105 +0,0 @@
#!/usr/bin/env python3
"""Test script to verify asyncari ping functionality."""
import asyncio
import os
import sys
from pathlib import Path
# Add the asyncari src to Python path for testing
asyncari_path = Path(__file__).parent.parent.parent.parent.parent / "asyncari" / "src"
sys.path.insert(0, str(asyncari_path))
import asyncari
from loguru import logger
async def test_ping():
"""Test the ping functionality with asyncari."""
# Configure from environment or use defaults
base_url = os.getenv("ARI_STASIS_ENDPOINT", "http://localhost:8088")
username = os.getenv("ARI_STASIS_USER", "asterisk")
password = os.getenv("ARI_STASIS_USER_PASSWORD", "asterisk")
apps = os.getenv("ARI_STASIS_APP_NAME", "test-app")
logger.info(f"Connecting to ARI at {base_url}")
try:
async with asyncari.connect(
base_url=base_url, apps=apps, username=username, password=password
) as client:
logger.info("Connected to ARI")
# Test REST API ping
logger.info("Testing REST API ping...")
result = await client.asterisk.ping()
logger.info(f"REST API ping successful: {result}")
# Test WebSocket ping (should work with our wrapper)
logger.info("Testing WebSocket ping...")
for ws in client.websockets:
try:
await ws.ping()
logger.info("WebSocket ping() called successfully (no-op)")
except AttributeError:
logger.error("WebSocket doesn't have ping() method")
except Exception as e:
logger.error(f"WebSocket ping failed: {e}")
# Test the keep_alive function
from ari_client_manager import keep_alive
logger.info("Starting keep_alive task...")
keep_alive_task = asyncio.create_task(keep_alive(client, interval=5.0))
# Run for 20 seconds to see several pings
await asyncio.sleep(20)
# Cancel keep_alive
keep_alive_task.cancel()
try:
await keep_alive_task
except asyncio.CancelledError:
logger.info("keep_alive task cancelled")
logger.info("Test completed successfully!")
except Exception as e:
logger.exception(f"Test failed: {e}")
return False
return True
async def test_with_manager():
"""Test using the ARI client manager."""
from ari_client_manager import setup_ari_client_supervisor
async def on_stasis_call(client, channel, context_vars):
logger.info(f"Received call: {channel.id}")
# Enable ARI Stasis for testing
os.environ["ENABLE_ARI_STASIS"] = "true"
supervisor = await setup_ari_client_supervisor(on_stasis_call)
if supervisor:
logger.info("ARI Stasis supervisor started with ping support")
# Run for 30 seconds
await asyncio.sleep(30)
await supervisor.close()
logger.info("Supervisor closed")
else:
logger.error("Failed to start supervisor")
if __name__ == "__main__":
import sys
if len(sys.argv) > 1 and sys.argv[1] == "manager":
asyncio.run(test_with_manager())
else:
asyncio.run(test_ping())

View file

@ -1,83 +0,0 @@
#!/usr/bin/env python3
"""Test script to verify real WebSocket ping frames are being sent."""
import asyncio
import os
import sys
from pathlib import Path
# Add the asyncari src to Python path
asyncari_path = Path(__file__).parent.parent.parent.parent.parent / "asyncari" / "src"
sys.path.insert(0, str(asyncari_path))
import asyncari
from loguru import logger
# Enable debug logging to see ping frames
logger.add(sys.stderr, level="DEBUG")
async def test_real_ping():
"""Test that real WebSocket ping frames are sent."""
# Configure from environment or use defaults
base_url = os.getenv("ARI_STASIS_ENDPOINT", "http://localhost:8088")
username = os.getenv("ARI_STASIS_USER", "asterisk")
password = os.getenv("ARI_STASIS_USER_PASSWORD", "asterisk")
apps = os.getenv("ARI_STASIS_APP_NAME", "test-app")
logger.info(f"Connecting to ARI at {base_url}")
try:
async with asyncari.connect(
base_url=base_url, apps=apps, username=username, password=password
) as client:
logger.info("Connected to ARI")
# Get the WebSocket
for ws in client.websockets:
logger.info(f"WebSocket type: {type(ws)}")
logger.info(
f"WebSocket wrapper active: {'WebSocketWrapper' in str(type(ws))}"
)
# Check internal structure
if hasattr(ws, "_websocket"):
inner_ws = ws._websocket
logger.info(f"Inner WebSocket type: {type(inner_ws)}")
logger.info(f"Has _connection: {hasattr(inner_ws, '_connection')}")
logger.info(f"Has _sock: {hasattr(inner_ws, '_sock')}")
# Send a test ping
logger.info("Sending test ping...")
try:
await ws.ping(b"test-ping-123")
logger.info("Ping sent successfully!")
except Exception as e:
logger.error(f"Ping failed: {e}")
# Test the keep_alive function
logger.info("\nTesting keep_alive function...")
from ari_client_manager import keep_alive
# Run keep_alive for a short time
keep_alive_task = asyncio.create_task(keep_alive(client, interval=3.0))
# Let it run for 10 seconds to see multiple pings
await asyncio.sleep(10)
# Cancel and cleanup
keep_alive_task.cancel()
try:
await keep_alive_task
except asyncio.CancelledError:
pass
logger.info("Test completed!")
except Exception as e:
logger.exception(f"Test failed: {e}")
if __name__ == "__main__":
asyncio.run(test_real_ping())

View file

@ -1,371 +0,0 @@
"""Worker Event Subscriber for distributed ARI architecture.
This component runs in each FastAPI worker process and subscribes to
Redis events from the ARI Manager. It creates pipelines for assigned calls
without any direct ARI connection.
"""
import asyncio
import json
import uuid
from typing import Awaitable, Callable, Optional
import redis.asyncio as aioredis
from loguru import logger
from api.routes.stasis_rtp import on_stasis_call
from api.services.telephony.stasis_event_protocol import (
DisconnectCommand,
RedisChannels,
RedisKeys,
StasisEndEvent,
StasisStartEvent,
parse_event,
)
from api.services.telephony.stasis_rtp_connection import StasisRTPConnection
from pipecat.utils.run_context import set_current_run_id
class WorkerEventSubscriber:
"""Subscribes to ARI events from Redis and processes them in the worker."""
def __init__(
self,
redis_client: aioredis.Redis,
on_stasis_call: Callable[[StasisRTPConnection, dict], Awaitable[None]],
):
self.redis = redis_client
self.worker_id = str(uuid.uuid4()) # Generate unique worker ID
self.on_stasis_call = on_stasis_call
self._running = False
self._task: Optional[asyncio.Task] = None
self._heartbeat_task: Optional[asyncio.Task] = None
self._active_connections: dict[str, StasisRTPConnection] = {}
self._active_tasks: dict[str, asyncio.Task] = {}
self._cleanup_tasks: dict[str, asyncio.Task] = {}
self._shutting_down = False
self._shutdown_event = asyncio.Event()
async def start(self):
"""Start the event subscriber."""
if self._task is None:
self._running = True
# Register worker in Redis
await self._register_worker()
# Start main event loop
self._task = asyncio.create_task(
self._run(), name=f"worker_subscriber_{self.worker_id}"
)
# Start heartbeat task
self._heartbeat_task = asyncio.create_task(
self._heartbeat_loop(), name=f"worker_heartbeat_{self.worker_id}"
)
logger.info(f"Worker {self.worker_id} event subscriber started")
async def _register_worker(self):
"""Register this worker in Redis."""
worker_key = RedisKeys.worker_active(self.worker_id)
worker_data = json.dumps({"status": "ready", "active_calls": 0})
# Set with TTL of 30 seconds (will be refreshed by heartbeat)
await self.redis.setex(worker_key, 30, worker_data)
# Add to workers set
await self.redis.sadd(RedisKeys.workers_set(), self.worker_id)
logger.info(f"Worker {self.worker_id} registered in Redis")
async def _heartbeat_loop(self):
"""Send periodic heartbeats to Redis."""
try:
while self._running:
# Update worker status with current active call count
worker_key = RedisKeys.worker_active(self.worker_id)
worker_data = json.dumps(
{
"status": "draining" if self._shutting_down else "ready",
"active_calls": len(self._active_tasks),
}
)
# Refresh TTL to 30 seconds
await self.redis.setex(worker_key, 30, worker_data)
# Wait 10 seconds before next heartbeat
await asyncio.sleep(10)
except asyncio.CancelledError:
logger.debug(f"Worker {self.worker_id} heartbeat cancelled")
except Exception as e:
logger.exception(f"Worker {self.worker_id} heartbeat error: {e}")
async def graceful_shutdown(self, max_wait_seconds: int = 300):
"""Gracefully shutdown the worker, waiting for calls to complete.
Args:
max_wait_seconds: Maximum time to wait for calls to complete (default 5 minutes)
"""
logger.info(f"Worker {self.worker_id} starting graceful shutdown")
# Mark as shutting down to prevent new calls
self._shutting_down = True
# Update status in Redis to 'draining'
worker_key = RedisKeys.worker_active(self.worker_id)
worker_data = json.dumps(
{"status": "draining", "active_calls": len(self._active_tasks)}
)
await self.redis.setex(worker_key, 30, worker_data)
# Wait for active tasks to complete (with timeout)
start_time = asyncio.get_event_loop().time()
while (
self._active_tasks
and (asyncio.get_event_loop().time() - start_time) < max_wait_seconds
):
active_count = len(self._active_tasks)
logger.info(
f"Worker {self.worker_id} waiting for {active_count} active calls to complete"
)
# Update Redis with current status
worker_data = json.dumps(
{"status": "draining", "active_calls": active_count}
)
await self.redis.setex(worker_key, 30, worker_data)
# Wait a bit before checking again
await asyncio.sleep(5)
# Force stop if timeout reached
if self._active_tasks:
logger.warning(
f"Worker {self.worker_id} forcefully stopping {len(self._active_tasks)} active calls after timeout channel_ids: {list(self._active_tasks.keys())}"
)
await self.stop()
async def stop(self):
"""Stop the event subscriber and deregister from Redis."""
self._running = False
# Deregister from Redis
await self._deregister_worker()
# Cancel all active call processing tasks
for channel_id, task in list(self._active_tasks.items()):
if not task.done():
logger.info(f"Cancelling active call task for channel {channel_id}")
task.cancel()
# Cancel all cleanup tasks
for channel_id, task in list(self._cleanup_tasks.items()):
if not task.done():
logger.info(f"Cancelling cleanup task for channel {channel_id}")
task.cancel()
# Wait for all tasks to complete
all_tasks = list(self._active_tasks.values()) + list(
self._cleanup_tasks.values()
)
if all_tasks:
await asyncio.gather(*all_tasks, return_exceptions=True)
# Cancel heartbeat task
if self._heartbeat_task:
self._heartbeat_task.cancel()
try:
await self._heartbeat_task
except asyncio.CancelledError:
pass
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
logger.info(f"Worker {self.worker_id} event subscriber stopped")
async def _deregister_worker(self):
"""Remove this worker from Redis."""
try:
# Remove from active workers
await self.redis.delete(RedisKeys.worker_active(self.worker_id))
# Remove from workers set
await self.redis.srem(RedisKeys.workers_set(), self.worker_id)
logger.info(f"Worker {self.worker_id} deregistered from Redis")
except Exception as e:
logger.error(f"Error deregistering worker {self.worker_id}: {e}")
async def _run(self):
"""Main subscriber loop."""
self._running = True
channel = RedisChannels.worker_events(self.worker_id)
pubsub = self.redis.pubsub()
try:
await pubsub.subscribe(channel)
logger.info(f"Worker {self.worker_id} subscribed to {channel}")
async for message in pubsub.listen():
if not self._running:
break
if message["type"] == "message":
try:
await self._handle_event(message["data"])
except Exception as e:
logger.exception(f"Error handling event: {e}")
except asyncio.CancelledError:
logger.debug(f"Worker {self.worker_id} subscriber cancelled")
except Exception as e:
logger.exception(f"Worker {self.worker_id} subscriber error: {e}")
finally:
await pubsub.unsubscribe(channel)
await pubsub.aclose()
async def _handle_event(self, data: str):
"""Handle an event from the ARI Manager."""
event = parse_event(data)
if not event:
logger.warning(f"Failed to parse event: {data}")
return
if isinstance(event, StasisStartEvent):
await self._handle_stasis_start(event)
elif isinstance(event, StasisEndEvent):
await self._handle_stasis_end(event)
else:
logger.warning(
f"channelID: {event.channel_id} Unhandled event type: {type(event)}"
)
async def _handle_stasis_start(self, event: StasisStartEvent):
"""Handle a new call assignment."""
channel_id = event.channel_id
logger.info(
f"channelID: {channel_id} Worker {self.worker_id} handling StasisStart"
)
try:
# Create StasisRTPConnection without ARI client
connection = StasisRTPConnection(
redis_client=self.redis,
channel_id=channel_id,
caller_channel_id=event.caller_channel_id,
em_channel_id=event.em_channel_id,
bridge_id=event.bridge_id,
local_addr=tuple(event.local_addr) if event.local_addr else None,
remote_addr=tuple(event.remote_addr) if event.remote_addr else None,
)
# Store connection for cleanup
self._active_connections[channel_id] = connection
# Create a background task to handle the call
task = asyncio.create_task(
self._process_call(connection, event.call_context_vars, channel_id),
name=f"call_handler_{channel_id}",
)
self._active_tasks[channel_id] = task
except Exception as e:
logger.exception(f"Error handling StasisStart for {channel_id}: {e}")
# Send disconnect command if setup fails
await self._send_disconnect(channel_id, "setup_failed")
async def _process_call(
self, connection: StasisRTPConnection, call_context_vars: dict, channel_id: str
):
"""Process a call in the background."""
try:
await self.on_stasis_call(connection, call_context_vars)
except Exception as e:
logger.exception(f"Error processing call for {channel_id}: {e}")
# Send disconnect command if call processing fails
await self._send_disconnect(channel_id, "processing_failed")
finally:
# Clean up task reference
if channel_id in self._active_tasks:
del self._active_tasks[channel_id]
async def _process_cleanup(self, channel_id: str):
"""Process call cleanup in the background."""
try:
if channel_id in self._active_connections:
connection: StasisRTPConnection = self._active_connections[channel_id]
# We must wait for the connection's invocation
# before sending in remote disconnect. Otherwise,
# the event handlers won't be registered and we won't
# be able to call on_client_disconnected to cancel the
# pipeline
while not connection._connect_invoked:
await asyncio.sleep(0.1)
# Set the run_id context so that we can have it in logs
if connection.workflow_run_id:
set_current_run_id(connection.workflow_run_id)
await connection.handle_remote_disconnect()
del self._active_connections[channel_id]
except Exception as e:
logger.exception(f"Error during cleanup for {channel_id}: {e}")
finally:
# Clean up task reference from cleanup tasks dictionary
if channel_id in self._cleanup_tasks:
del self._cleanup_tasks[channel_id]
async def _handle_stasis_end(self, event: StasisEndEvent):
"""Handle call termination."""
channel_id = event.channel_id
logger.info(
f"channelID: {channel_id} Worker {self.worker_id} handling StasisEnd"
)
# Create a background task to handle the cleanup
if channel_id in self._active_connections:
# Check if there's already a cleanup task for this channel
if (
channel_id not in self._cleanup_tasks
or self._cleanup_tasks[channel_id].done()
):
# Lets start a new task, since we need to poll for
# connection to be invoked from the pipeline before
# caling remote disconnect
task = asyncio.create_task(
self._process_cleanup(channel_id),
name=f"cleanup_handler_{channel_id}",
)
self._cleanup_tasks[channel_id] = task
else:
logger.warning(
f"channelID: {channel_id} Cleanup skipped - cleanup task still running"
)
async def _send_disconnect(self, channel_id: str, reason: str):
"""Send disconnect command to ARI Manager."""
command = DisconnectCommand(channel_id=channel_id, reason=reason)
channel = RedisChannels.channel_commands(channel_id)
await self.redis.publish(channel, command.to_json())
async def setup_worker_subscriber(
redis_client: aioredis.Redis,
) -> WorkerEventSubscriber:
"""Setup the worker event subscriber with dynamic registration."""
subscriber = WorkerEventSubscriber(redis_client, on_stasis_call)
logger.info(f"Setting up worker event subscriber with ID {subscriber.worker_id}")
await subscriber.start()
return subscriber

View file

@ -18,7 +18,6 @@ from pipecat.services.llm_service import FunctionCallParams
from pipecat.utils.enums import EndTaskReason
if TYPE_CHECKING:
from api.services.telephony.stasis_rtp_connection import StasisRTPConnection
from pipecat.frames.frames import Frame
from pipecat.services.anthropic.llm import AnthropicLLMService
from pipecat.services.google.llm import GoogleLLMService
@ -83,9 +82,6 @@ class PipecatEngine:
self._gathered_context: dict = {}
self._user_response_timeout_task: Optional[asyncio.Task] = None
# Stasis connection for immediate transfers
self._stasis_connection: Optional["StasisRTPConnection"] = None
# Will be set later in initialize() when we have
# access to _context
self._variable_extraction_manager = None
@ -695,23 +691,6 @@ class PipecatEngine:
"""
self.task = task
def set_stasis_connection(
self, connection: Optional["StasisRTPConnection"]
) -> None:
"""Set the Stasis RTP connection for immediate transfers.
This allows the engine to initiate transfers immediately when XFER
disposition is detected, without waiting for pipeline shutdown.
Args:
connection: The StasisRTPConnection instance, or None for non-Stasis transports
"""
self._stasis_connection = connection
if connection:
logger.debug(
f"Stasis connection set for immediate transfers: {connection.channel_id}"
)
def set_audio_config(self, audio_config) -> None:
"""Set the audio configuration for the pipeline."""
self._audio_config = audio_config

View file

@ -90,6 +90,7 @@
"integrations/telephony/vonage",
"integrations/telephony/cloudonix",
"integrations/telephony/vobiz",
"integrations/telephony/asterisk-ari",
"integrations/telephony/webhooks",
"integrations/telephony/custom"
]

View file

@ -0,0 +1,215 @@
---
title: "Asterisk ARI Integration"
description: "Connect Dograh AI to your Asterisk PBX using the Asterisk REST Interface (ARI)"
---
## Overview
Asterisk ARI (Asterisk REST Interface) allows you to connect Dograh AI voice agents to your existing Asterisk PBX. ARI provides a WebSocket-based event model for controlling calls via Stasis applications, giving Dograh full control over call flow and audio streaming.
This guide focuses on the Dograh-specific configuration. For general Asterisk installation and administration, refer to the [official Asterisk documentation](https://docs.asterisk.org/).
## Prerequisites
Before setting up the ARI integration, ensure you have:
- A running Asterisk instance (version 16 or later recommended)
- ARI module enabled in Asterisk
- `chan_websocket` (WebSocket channel driver) enabled in your Asterisk build
- Network connectivity between your Dograh instance and Asterisk
- Dograh AI instance running and accessible
<Note>
If you compiled Asterisk from source, ensure `chan_websocket` is included during the build. This module is required for external media streaming between Asterisk and Dograh. Refer to the [Asterisk build system documentation](https://docs.asterisk.org/) for details on enabling modules.
</Note>
## Asterisk Configuration
The following Asterisk configuration files need to be set up to work with Dograh. These are minimal examples focused on the Dograh integration -- refer to the [Asterisk documentation](https://docs.asterisk.org/) for full configuration details.
### Enable ARI (`ari.conf`)
Create an ARI user that Dograh will use to authenticate:
```ini
[general]
enabled = yes
[dograh]
type = user
read_only = no
password = your_secure_password
```
<Note>
The username (section name, e.g., `dograh`) and password here must match the **Stasis App Name** and **App Password** you configure in Dograh.
</Note>
### Enable the HTTP Server (`http.conf`)
ARI requires the Asterisk HTTP server to be enabled:
```ini
[general]
enabled = yes
bindaddr = 0.0.0.0
bindport = 8088
```
### Configure the Stasis Dialplan (`extensions.conf`)
Route incoming calls to your Stasis application so Dograh can handle them:
```ini
[from-external]
exten => _X.,1,NoOp(Incoming call to ${EXTEN})
same => n,Stasis(dograh)
same => n,Hangup()
```
Replace `dograh` with the app name you configured in `ari.conf` and in Dograh.
### Configure External Media Streaming (`websocket_client.conf`)
Dograh uses Asterisk's external media streaming to send and receive audio over WebSocket. Configure a WebSocket client connection that points to your Dograh instance:
```ini
[dograh_staging]
type = websocket_client
uri = ws://your-dograh-host:port/ws/audio
protocols = audio
```
<Note>
The section name (e.g., `dograh_staging`) is the **WebSocket Client Name** you'll enter in the Dograh telephony configuration. This name tells Asterisk which WebSocket connection to use for external media streaming during calls.
</Note>
Refer to the [Asterisk WebSocket documentation](https://docs.asterisk.org/) for additional `websocket_client.conf` options and TLS configuration.
## Configuration in Dograh
### Step 1: Navigate to Telephony Settings
1. Go to **Workflow** → **Phone Call** → **Configure Telephony**
2. Select **Asterisk (ARI)** as your provider
### Step 2: Enter Your ARI Credentials
Configure the following fields:
| Field | Description | Example |
|-------|-------------|---------|
| **ARI Endpoint URL** | HTTP base URL of your Asterisk ARI server | `http://asterisk.example.com:8088` |
| **Stasis App Name** | The ARI username configured in `ari.conf` | `dograh` |
| **App Password** | The ARI password configured in `ari.conf` | `your_secure_password` |
| **WebSocket Client Name** | The connection name from `websocket_client.conf` | `dograh_staging` |
| **Inbound Workflow ID** | The workflow to activate for inbound calls (optional) | `42` |
| **SIP Extensions / Numbers** | Optional SIP extensions or trunk numbers for outbound calls | `PJSIP/6001` or `6001` |
### Step 3: Save and Test
1. Click **Save Configuration**
2. Create a test workflow
3. Initiate a test call to verify the connection
## Inbound Calling
Unlike other telephony providers that use HTTP webhooks for inbound calls, ARI delivers inbound calls as **StasisStart events on the ARI WebSocket**. Dograh automatically detects these events and activates the configured workflow.
### How It Works
1. An external call arrives at Asterisk and the dialplan routes it to `Stasis(dograh)`
2. Asterisk fires a StasisStart event over the ARI WebSocket with the channel in `Ring` state
3. Dograh identifies this as an inbound call, validates your quota, and creates a workflow run
4. The call is answered, bridged to an external media channel, and your voice agent workflow begins
### Setting Up Inbound Calls
**Step 1: Configure the Asterisk dialplan**
Ensure your dialplan routes inbound calls to the Stasis application as shown in the [dialplan configuration above](#configure-the-stasis-dialplan-extensionsconf).
**Step 2: Set the Inbound Workflow ID in Dograh**
1. Go to **Workflow** → **Phone Call** → **Configure Telephony**
2. In the ARI configuration, enter the **Inbound Workflow ID** — this is the ID of the workflow you want to activate when an inbound call arrives
3. Click **Save Configuration**
You can find a workflow's ID in the URL when viewing it (e.g., `/workflows/42` means the ID is `42`).
<Note>
If no Inbound Workflow ID is configured, inbound calls will be hung up immediately. You must set this field for inbound calling to work.
</Note>
**Step 3: Test an inbound call**
Place a call to a number or extension routed to your Stasis application. You should see the workflow activate and the voice agent respond.
### Inbound Call Context
When an inbound call activates a workflow, the following context is available to your workflow:
| Field | Description |
|-------|-------------|
| `caller_number` | The caller's phone number or extension |
| `called_number` | The dialed number or extension |
| `direction` | Always `inbound` |
| `call_id` | The Asterisk channel ID |
| `provider` | Always `ari` |
## Troubleshooting
<AccordionGroup>
<Accordion title="Cannot connect to ARI endpoint">
- Verify the ARI endpoint URL is correct and reachable from your Dograh instance
- Check that the Asterisk HTTP server is running (`http.conf` has `enabled = yes`)
- Ensure firewall rules allow traffic on the ARI port (default: 8088)
- Confirm the ARI module is loaded: run `module show like res_ari` in the Asterisk CLI
</Accordion>
<Accordion title="Authentication failed">
- Verify the Stasis App Name matches the ARI user section name in `ari.conf`
- Check the App Password matches the password in `ari.conf`
- Ensure there are no extra spaces in the credentials
</Accordion>
<Accordion title="No audio during calls">
- Verify `chan_websocket` is loaded: run `module show like chan_websocket` in the Asterisk CLI
- Check that `websocket_client.conf` is correctly configured with the right Dograh URI
- Ensure the WebSocket Client Name in Dograh matches the section name in `websocket_client.conf`
- Verify network connectivity and firewall rules allow WebSocket traffic between Asterisk and Dograh
</Accordion>
<Accordion title="Calls not reaching Dograh">
- Ensure the dialplan routes calls to `Stasis(your_app_name)`
- Verify the app name in the dialplan matches the ARI user in `ari.conf`
- Check Asterisk CLI for errors: `asterisk -rvvv`
- Confirm the ARI WebSocket connection is active
</Accordion>
<Accordion title="Inbound calls are immediately hung up">
- Verify the **Inbound Workflow ID** is set in your ARI telephony configuration
- Confirm the workflow ID exists and belongs to the same organization as the ARI config
- Check that your organization has available quota
- Review Dograh logs for warnings mentioning "no inbound_workflow_id configured"
</Accordion>
<Accordion title="WebSocket client connection issues">
- Check the URI in `websocket_client.conf` points to the correct Dograh host and port
- Verify the Dograh instance is running and accepting WebSocket connections
- If using TLS, ensure certificates are correctly configured on both sides
</Accordion>
</AccordionGroup>
## Best Practices
- Keep your Asterisk instance on the same network or a low-latency connection to Dograh for optimal audio quality
- Use strong passwords for ARI authentication
- Restrict ARI access to known IP addresses using firewall rules
- Monitor Asterisk logs alongside Dograh logs when debugging call issues
- Keep Asterisk updated to the latest stable version for security and compatibility
## Further Reading
- [Asterisk Documentation](https://docs.asterisk.org/) -- official reference for all Asterisk configuration
- [ARI Documentation](https://docs.asterisk.org/Configuration/Interfaces/Asterisk-REST-Interface-ARI/) -- detailed ARI configuration and API reference

View file

@ -19,6 +19,9 @@ Dograh AI supports inbound calling across all supported telephony providers. Whe
<Card title="Vobiz" href="/integrations/telephony/vobiz">
Cloud-based telephony with global reach and competitive pricing
</Card>
<Card title="Asterisk ARI" href="/integrations/telephony/asterisk-ari">
Connect to your own Asterisk PBX via the Asterisk REST Interface
</Card>
</CardGroup>
<Note>
@ -46,6 +49,7 @@ The telephony configuration for inbound calling is **identical** to outbound cal
- [Twilio Configuration](/integrations/telephony/twilio#configuration)
- [Cloudonix Configuration](/integrations/telephony/cloudonix#configuration)
- [Vobiz Configuration](/integrations/telephony/vobiz#configuration)
- [Asterisk ARI Configuration](/integrations/telephony/asterisk-ari#configuration-in-dograh)
### Step 2: Get Your Workflow Webhook URL
@ -75,6 +79,7 @@ Each telephony provider requires additional configuration to route incoming call
- [Vonage Inbound Setup](/integrations/telephony/vonage#inbound-calling-setup)
- [Cloudonix Inbound Setup](/integrations/telephony/cloudonix#inbound-calling-setup)
- [Vobiz Inbound Setup](/integrations/telephony/vobiz#inbound-calling-setup)
- [Asterisk ARI Inbound Setup](/integrations/telephony/asterisk-ari#inbound-calling)
## Testing Inbound Calls

@ -1 +1 @@
Subproject commit 58469410c7d34851bd56e4e460ee5c7b77c31e0c
Subproject commit fbc9a768445e8f683721744659fc8904d4012081

View file

@ -8,6 +8,8 @@ import { toast } from "sonner";
import { getTelephonyConfigurationApiV1OrganizationsTelephonyConfigGet, saveTelephonyConfigurationApiV1OrganizationsTelephonyConfigPost } from "@/client/sdk.gen";
import type {
AriConfigurationRequest,
AriConfigurationResponse,
CloudonixConfigurationRequest,
CloudonixConfigurationResponse,
TelephonyConfigurationResponse,
@ -51,6 +53,12 @@ interface TelephonyConfigForm {
// Cloudonix fields
bearer_token?: string;
domain_id?: string;
// ARI fields
ari_endpoint?: string;
app_name?: string;
app_password?: string;
ws_client_name?: string;
inbound_workflow_id?: number;
// Common field - multiple phone numbers
from_numbers: string[];
}
@ -140,6 +148,19 @@ export default function ConfigureTelephonyPage() {
setValue("bearer_token", cloudonixConfig.bearer_token);
setValue("domain_id", cloudonixConfig.domain_id);
setValue("from_numbers", cloudonixConfig.from_numbers?.length > 0 ? cloudonixConfig.from_numbers : [""]);
} else if ((response.data as TelephonyConfigurationResponse)?.ari) {
const ariConfig = (response.data as TelephonyConfigurationResponse).ari as AriConfigurationResponse;
setHasExistingConfig(true);
setValue("provider", "ari");
setValue("ari_endpoint", ariConfig.ari_endpoint);
setValue("app_name", ariConfig.app_name);
setValue("app_password", ariConfig.app_password);
setValue("ws_client_name", ariConfig.ws_client_name);
setValue(
"inbound_workflow_id",
typeof ariConfig.inbound_workflow_id === "number" ? ariConfig.inbound_workflow_id : undefined
);
setValue("from_numbers", ariConfig.from_numbers?.length > 0 ? ariConfig.from_numbers : [""]);
}
}
} catch (error) {
@ -161,12 +182,13 @@ export default function ConfigureTelephonyPage() {
| TwilioConfigurationRequest
| VonageConfigurationRequest
| VobizConfigurationRequest
| CloudonixConfigurationRequest;
| CloudonixConfigurationRequest
| AriConfigurationRequest;
const filteredNumbers = data.from_numbers.filter(n => n.trim() !== "");
// Validate phone numbers are provided (except for Cloudonix where optional)
if (data.provider !== "cloudonix" && filteredNumbers.length === 0) {
// Validate phone numbers are provided (except for Cloudonix/ARI where optional)
if (data.provider !== "cloudonix" && data.provider !== "ari" && filteredNumbers.length === 0) {
toast.error("At least one phone number is required");
setIsLoading(false);
return;
@ -185,6 +207,10 @@ export default function ConfigureTelephonyPage() {
} else if (data.provider === "cloudonix") {
pattern = cloudonixPattern;
formatMessage = "(e.g., +1234567890)";
} else if (data.provider === "ari") {
// ARI uses SIP extensions - skip phone number validation
pattern = /^.+$/;
formatMessage = "(SIP extension or number)";
} else {
pattern = vonageVobizPattern;
formatMessage = "without + prefix (e.g., 14155551234)";
@ -220,14 +246,24 @@ export default function ConfigureTelephonyPage() {
auth_id: data.auth_id,
auth_token: data.vobiz_auth_token,
} as VobizConfigurationRequest;
} else {
// Cloudonix
} else if (data.provider === "cloudonix") {
requestBody = {
provider: data.provider,
from_numbers: filteredNumbers,
bearer_token: data.bearer_token!,
domain_id: data.domain_id!,
} as CloudonixConfigurationRequest;
} else {
// ARI
requestBody = {
provider: data.provider,
from_numbers: filteredNumbers,
ari_endpoint: data.ari_endpoint!,
app_name: data.app_name!,
app_password: data.app_password!,
ws_client_name: data.ws_client_name || "",
inbound_workflow_id: data.inbound_workflow_id || undefined,
} as AriConfigurationRequest;
}
const response = await saveTelephonyConfigurationApiV1OrganizationsTelephonyConfigPost({
@ -276,11 +312,18 @@ export default function ConfigureTelephonyPage() {
? "Vonage"
: selectedProvider === "vobiz"
? "Vobiz"
: selectedProvider === "ari"
? "Asterisk ARI"
: "Cloudonix"}{" "}
Setup Guide
</CardTitle>
<CardDescription>
{selectedProvider === "cloudonix" ? (
{selectedProvider === "ari" ? (
<>
Connect Dograh to your Asterisk PBX using the Asterisk REST Interface (ARI).
ARI provides a WebSocket-based event model for controlling calls via Stasis applications.
</>
) : selectedProvider === "cloudonix" ? (
<>
Cloudonix is an AI Connectivity platform, enabling you to connect Dograh to any SIP product or SIP Telephony Provider.<br/><br/>
<iframe
@ -325,7 +368,27 @@ export default function ConfigureTelephonyPage() {
</CardDescription>
</CardHeader>
<CardContent>
{selectedProvider === "twilio" || selectedProvider === "vonage" ? (
{selectedProvider === "ari" ? (
<div className="space-y-4 text-sm">
<div>
<h4 className="font-semibold mb-2">Getting Started with Asterisk ARI:</h4>
<ol className="list-decimal list-inside space-y-1 text-muted-foreground">
<li>Enable the ARI module in your Asterisk configuration (ari.conf)</li>
<li>Create an ARI user with a password in ari.conf</li>
<li>Create a Stasis application in your dialplan (extensions.conf)</li>
<li>Ensure the ARI HTTP endpoint is accessible from Dograh</li>
<li>Enter your ARI endpoint URL, app name, and password below</li>
</ol>
</div>
<div className="bg-muted border border-border rounded p-3">
<p className="text-sm">
<strong>Note:</strong> ARI uses WebSocket connections for real-time
event listening. The ARI manager process will automatically connect
to your Asterisk instance once configured.
</p>
</div>
</div>
) : selectedProvider === "twilio" || selectedProvider === "vonage" ? (
<div className="aspect-video">
<iframe
style={{ border: 0 }}
@ -407,6 +470,7 @@ export default function ConfigureTelephonyPage() {
<SelectItem value="vonage">Vonage</SelectItem>
<SelectItem value="vobiz">Vobiz</SelectItem>
<SelectItem value="cloudonix">Cloudonix</SelectItem>
<SelectItem value="ari">Asterisk (ARI)</SelectItem>
</SelectContent>
</Select>
{hasExistingConfig && (
@ -771,6 +835,140 @@ export default function ConfigureTelephonyPage() {
</>
)}
{/* ARI-specific fields */}
{selectedProvider === "ari" && (
<>
<div className="space-y-2">
<Label htmlFor="ari_endpoint">ARI Endpoint URL</Label>
<Input
id="ari_endpoint"
placeholder="http://asterisk.example.com:8088"
{...register("ari_endpoint", {
required:
selectedProvider === "ari"
? "ARI endpoint URL is required"
: false,
})}
/>
{errors.ari_endpoint && (
<p className="text-sm text-red-500">
{errors.ari_endpoint.message}
</p>
)}
<p className="text-xs text-muted-foreground">
The HTTP base URL for your Asterisk ARI (e.g., http://host:8088)
</p>
</div>
<div className="space-y-2">
<Label htmlFor="app_name">Stasis App Name</Label>
<Input
id="app_name"
placeholder="dograh"
{...register("app_name", {
required:
selectedProvider === "ari"
? "Stasis app name is required"
: false,
})}
/>
{errors.app_name && (
<p className="text-sm text-red-500">
{errors.app_name.message}
</p>
)}
<p className="text-xs text-muted-foreground">
The ARI username and Stasis application name configured in ari.conf
</p>
</div>
<div className="space-y-2">
<Label htmlFor="app_password">App Password</Label>
<Input
id="app_password"
type="password"
autoComplete="current-password"
placeholder={
hasExistingConfig
? "Leave masked to keep existing"
: "Enter your ARI password"
}
{...register("app_password", {
required:
selectedProvider === "ari" && !hasExistingConfig
? "App password is required"
: false,
})}
/>
{errors.app_password && (
<p className="text-sm text-red-500">
{errors.app_password.message}
</p>
)}
</div>
<div className="space-y-2">
<Label htmlFor="ws_client_name">WebSocket Client Name</Label>
<Input
id="ws_client_name"
placeholder="dograh_staging"
{...register("ws_client_name")}
/>
<p className="text-xs text-muted-foreground">
Connection name from Asterisk&apos;s websocket_client.conf for external media streaming
</p>
</div>
<div className="space-y-2">
<Label htmlFor="inbound_workflow_id">Inbound Workflow ID (Optional)</Label>
<Input
id="inbound_workflow_id"
type="number"
placeholder="e.g. 42"
{...register("inbound_workflow_id", { valueAsNumber: true })}
/>
<p className="text-xs text-muted-foreground">
Workflow to activate for inbound calls received via ARI
</p>
</div>
<div className="space-y-2">
<Label>SIP Extensions / Numbers (Optional)</Label>
{fromNumbers.map((number, index) => (
<div key={index} className="flex gap-2">
<Input
placeholder="PJSIP/6001 or 6001"
value={number}
onChange={(e) => updatePhoneNumber(index, e.target.value)}
/>
{fromNumbers.length > 1 && (
<Button
type="button"
variant="outline"
size="icon"
onClick={() => removePhoneNumber(index)}
>
<Trash2 className="h-4 w-4" />
</Button>
)}
</div>
))}
<Button
type="button"
variant="outline"
size="sm"
onClick={addPhoneNumber}
>
<Plus className="h-4 w-4 mr-2" />
Add Extension
</Button>
<p className="text-xs text-muted-foreground">
SIP extensions or trunk numbers for outbound calls
</p>
</div>
</>
)}
<div className="pt-4 space-y-3">
<Button
type="submit"

View file

@ -21,6 +21,7 @@ import {
DialogHeader,
DialogTitle,
} from "@/components/ui/dialog";
import { Input } from "@/components/ui/input";
import { useUserConfig } from "@/context/UserConfigContext";
interface PhoneCallDialogProps {
@ -47,6 +48,7 @@ export const PhoneCallDialog = ({
const [phoneChanged, setPhoneChanged] = useState(false);
const [checkingConfig, setCheckingConfig] = useState(false);
const [needsConfiguration, setNeedsConfiguration] = useState<boolean | null>(null);
const [sipMode, setSipMode] = useState(() => /^(PJSIP|SIP)\//i.test(userConfig?.test_phone_number || ""));
// Check telephony configuration when dialog opens
useEffect(() => {
@ -60,7 +62,7 @@ export const PhoneCallDialog = ({
headers: { 'Authorization': `Bearer ${accessToken}` },
});
if (configResponse.error || (!configResponse.data?.twilio && !configResponse.data?.vonage && !configResponse.data?.vobiz && !configResponse.data?.cloudonix)) {
if (configResponse.error || (!configResponse.data?.twilio && !configResponse.data?.vonage && !configResponse.data?.vobiz && !configResponse.data?.cloudonix && !configResponse.data?.ari)) {
setNeedsConfiguration(true);
} else {
setNeedsConfiguration(false);
@ -89,7 +91,9 @@ export const PhoneCallDialog = ({
// Keep phoneNumber in sync with userConfig when dialog opens
useEffect(() => {
if (open) {
setPhoneNumber(userConfig?.test_phone_number || "");
const saved = userConfig?.test_phone_number || "";
setPhoneNumber(saved);
setSipMode(/^(PJSIP|SIP)\//i.test(saved));
setPhoneChanged(false);
setCallError(null);
setCallSuccessMsg(null);
@ -189,14 +193,29 @@ export const PhoneCallDialog = ({
<DialogHeader>
<DialogTitle>Phone Call</DialogTitle>
<DialogDescription>
Enter the phone number to call. The number will be saved automatically.
Enter the phone number or SIP endpoint to call. The number will be saved automatically.
</DialogDescription>
</DialogHeader>
<PhoneInput
defaultCountry="in"
value={phoneNumber}
onChange={handlePhoneInputChange}
/>
{sipMode ? (
<Input
value={phoneNumber}
onChange={(e) => handlePhoneInputChange(e.target.value)}
placeholder="PJSIP/1234 or SIP/1234"
/>
) : (
<PhoneInput
defaultCountry="in"
value={phoneNumber}
onChange={handlePhoneInputChange}
/>
)}
<button
type="button"
className="text-xs text-muted-foreground hover:text-foreground underline"
onClick={() => { setSipMode(!sipMode); setPhoneNumber(""); setPhoneChanged(true); }}
>
{sipMode ? "Use phone number instead" : "Use SIP endpoint instead"}
</button>
<DialogFooter className="flex-col sm:flex-row gap-2">
<Button
variant="outline"
@ -219,9 +238,14 @@ export const PhoneCallDialog = ({
{callLoading ? "Calling..." : "Start Call"}
</Button>
) : (
<Button onClick={() => onOpenChange(false)}>
Close
</Button>
<>
<Button variant="outline" onClick={() => { setCallSuccessMsg(null); setCallError(null); }}>
Call Again
</Button>
<Button onClick={() => onOpenChange(false)}>
Close
</Button>
</>
)}
</div>
</DialogFooter>

View file

@ -19,6 +19,50 @@ export type ApiKeyStatusResponse = {
status: Array<ApiKeyStatus>;
};
/**
* Request schema for Asterisk ARI configuration.
*/
export type AriConfigurationRequest = {
provider?: string;
/**
* ARI base URL (e.g., http://asterisk.example.com:8088)
*/
ari_endpoint: string;
/**
* Stasis application name registered in Asterisk
*/
app_name: string;
/**
* ARI user password
*/
app_password: string;
/**
* websocket_client.conf connection name for externalMedia (e.g., dograh_staging)
*/
ws_client_name?: string;
/**
* Workflow ID for inbound calls
*/
inbound_workflow_id?: number | null;
/**
* List of SIP extensions/numbers for outbound calls (optional)
*/
from_numbers?: Array<string>;
};
/**
* Response schema for ARI configuration with masked sensitive fields.
*/
export type AriConfigurationResponse = {
provider: string;
ari_endpoint: string;
app_name: string;
app_password: string;
ws_client_name?: string;
inbound_workflow_id?: number | null;
from_numbers: Array<string>;
};
export type AccessTokenResponse = {
access_token: string | null;
refresh_token: string | null;
@ -795,6 +839,7 @@ export type TelephonyConfigurationResponse = {
vonage?: VonageConfigurationResponse | null;
vobiz?: VobizConfigurationResponse | null;
cloudonix?: CloudonixConfigurationResponse | null;
ari?: AriConfigurationResponse | null;
};
export type TestSessionResponse = {
@ -3461,7 +3506,7 @@ export type GetTelephonyConfigurationApiV1OrganizationsTelephonyConfigGetRespons
export type GetTelephonyConfigurationApiV1OrganizationsTelephonyConfigGetResponse = GetTelephonyConfigurationApiV1OrganizationsTelephonyConfigGetResponses[keyof GetTelephonyConfigurationApiV1OrganizationsTelephonyConfigGetResponses];
export type SaveTelephonyConfigurationApiV1OrganizationsTelephonyConfigPostData = {
body: TwilioConfigurationRequest | VonageConfigurationRequest | VobizConfigurationRequest | CloudonixConfigurationRequest;
body: TwilioConfigurationRequest | VonageConfigurationRequest | VobizConfigurationRequest | CloudonixConfigurationRequest | AriConfigurationRequest;
headers?: {
authorization?: string | null;
'X-API-Key'?: string | null;

View file

@ -7,9 +7,9 @@ export const WORKFLOW_RUN_MODES = {
VONAGE: 'vonage',
VOBIZ: 'vobiz',
CLOUDONIX: 'cloudonix',
STASIS: 'stasis',
WEBRTC: 'webrtc',
SMALL_WEBRTC: 'smallwebrtc',
ARI: 'ari'
} as const;
export type WorkflowRunMode = typeof WORKFLOW_RUN_MODES[keyof typeof WORKFLOW_RUN_MODES];