diff --git a/api/routes/organization.py b/api/routes/organization.py
index 7d4a10a..f840df8 100644
--- a/api/routes/organization.py
+++ b/api/routes/organization.py
@@ -135,6 +135,8 @@ async def get_telephony_configuration(user: UserModel = Depends(get_user)):
ws_client_name = config.value.get("ws_client_name", "")
from_numbers = config.value.get("from_numbers", [])
+ inbound_workflow_id = config.value.get("inbound_workflow_id")
+
return TelephonyConfigurationResponse(
ari=ARIConfigurationResponse(
provider="ari",
@@ -142,6 +144,7 @@ async def get_telephony_configuration(user: UserModel = Depends(get_user)):
app_name=app_name,
app_password=mask_key(app_password) if app_password else "",
ws_client_name=ws_client_name,
+ inbound_workflow_id=inbound_workflow_id,
from_numbers=from_numbers,
),
)
@@ -208,6 +211,7 @@ async def save_telephony_configuration(
"app_name": request.app_name,
"app_password": request.app_password,
"ws_client_name": request.ws_client_name,
+ "inbound_workflow_id": request.inbound_workflow_id,
"from_numbers": request.from_numbers,
}
else:
diff --git a/api/schemas/telephony_config.py b/api/schemas/telephony_config.py
index 72c248f..da6d605 100644
--- a/api/schemas/telephony_config.py
+++ b/api/schemas/telephony_config.py
@@ -104,6 +104,9 @@ class ARIConfigurationRequest(BaseModel):
default="",
description="websocket_client.conf connection name for externalMedia (e.g., dograh_staging)",
)
+ inbound_workflow_id: Optional[int] = Field(
+ default=None, description="Workflow ID for inbound calls"
+ )
from_numbers: List[str] = Field(
default_factory=list,
description="List of SIP extensions/numbers for outbound calls (optional)",
@@ -118,6 +121,7 @@ class ARIConfigurationResponse(BaseModel):
app_name: str
app_password: str # Masked
ws_client_name: str = ""
+ inbound_workflow_id: Optional[int] = None
from_numbers: List[str]
diff --git a/api/services/telephony/ari_manager.py b/api/services/telephony/ari_manager.py
index aa71e6f..2468972 100644
--- a/api/services/telephony/ari_manager.py
+++ b/api/services/telephony/ari_manager.py
@@ -24,10 +24,12 @@ from loguru import logger
from api.constants import REDIS_URL
from api.db import db_client
-from api.enums import OrganizationConfigurationKey
+from api.enums import CallType, OrganizationConfigurationKey, WorkflowRunMode
+from api.services.quota_service import check_dograh_quota_by_user_id
# Redis key pattern and TTL for channel-to-run mapping
_CHANNEL_KEY_PREFIX = "ari:channel:"
+_EXT_CHANNEL_KEY_PREFIX = "ari:ext_channel:"
_CHANNEL_KEY_TTL = 3600 # 1 hour safety expiry
@@ -41,12 +43,14 @@ class ARIConnection:
app_name: str,
app_password: str,
ws_client_name: str = "",
+ inbound_workflow_id: int = None,
):
self.organization_id = organization_id
self.ari_endpoint = ari_endpoint.rstrip("/")
self.app_name = app_name
self.app_password = app_password
self.ws_client_name = ws_client_name
+ self.inbound_workflow_id = inbound_workflow_id
self._ws: Optional[websockets.ClientConnection] = None
self._task: Optional[asyncio.Task] = None
@@ -88,6 +92,23 @@ class ARIConnection:
keys = [f"{_CHANNEL_KEY_PREFIX}{cid}" for cid in channel_ids]
await r.delete(*keys)
+ async def _mark_ext_channel(self, channel_id: str):
+ """Mark a channel as an external media channel we created."""
+ r = await self._get_redis()
+ await r.set(
+ f"{_EXT_CHANNEL_KEY_PREFIX}{channel_id}", "1", ex=_CHANNEL_KEY_TTL
+ )
+
+ async def _is_ext_channel(self, channel_id: str) -> bool:
+ """Check if a channel is an external media channel we created."""
+ r = await self._get_redis()
+ return await r.exists(f"{_EXT_CHANNEL_KEY_PREFIX}{channel_id}") > 0
+
+ async def _delete_ext_channel(self, channel_id: str):
+ """Remove the external media channel marker."""
+ r = await self._get_redis()
+ await r.delete(f"{_EXT_CHANNEL_KEY_PREFIX}{channel_id}")
+
@property
def ws_url(self) -> str:
"""Build the ARI WebSocket URL."""
@@ -211,6 +232,16 @@ class ARIConnection:
channel_state = channel.get("state", "unknown")
if event_type == "StasisStart":
+ # Skip external media channels we created — they fire
+ # their own StasisStart but need no further handling.
+ if await self._is_ext_channel(channel_id):
+ logger.debug(
+ f"[ARI org={self.organization_id}] StasisStart for our "
+ f"externalMedia channel {channel_id}, ignoring"
+ )
+ await self._delete_ext_channel(channel_id)
+ return
+
app_args = event.get("args", [])
caller = channel.get("caller", {})
logger.info(
@@ -220,31 +251,38 @@ class ARIConnection:
f"args={app_args}"
)
- # Parse args to extract workflow context
- args_dict = {}
- for arg in app_args:
- for pair in arg.split(","):
- if "=" in pair:
- key, value = pair.split("=", 1)
- args_dict[key.strip()] = value.strip()
-
- workflow_run_id = args_dict.get("workflow_run_id")
- workflow_id = args_dict.get("workflow_id")
- user_id = args_dict.get("user_id")
-
- if not workflow_run_id or not workflow_id or not user_id:
- logger.warning(
- f"[ARI org={self.organization_id}] StasisStart missing required args: "
- f"workflow_run_id={workflow_run_id}, workflow_id={workflow_id}, user_id={user_id}"
+ if channel_state == "Ring":
+ # Inbound call — arrived from outside, not yet answered
+ asyncio.create_task(
+ self._handle_inbound_stasis_start(channel_id, channel_state, event)
)
- return
+ else:
+ # Outbound call (state == "Up") — originated by us
+ # Parse args to extract workflow context
+ args_dict = {}
+ for arg in app_args:
+ for pair in arg.split(","):
+ if "=" in pair:
+ key, value = pair.split("=", 1)
+ args_dict[key.strip()] = value.strip()
- # Start pipeline connection in background task
- asyncio.create_task(
- self._handle_stasis_start(
- channel_id, channel_state, workflow_run_id, workflow_id, user_id
+ workflow_run_id = args_dict.get("workflow_run_id")
+ workflow_id = args_dict.get("workflow_id")
+ user_id = args_dict.get("user_id")
+
+ if not workflow_run_id or not workflow_id or not user_id:
+ logger.warning(
+ f"[ARI org={self.organization_id}] StasisStart missing required args: "
+ f"workflow_run_id={workflow_run_id}, workflow_id={workflow_id}, user_id={user_id}"
+ )
+ return
+
+ # Start pipeline connection in background task
+ asyncio.create_task(
+ self._handle_stasis_start(
+ channel_id, channel_state, workflow_run_id, workflow_id, user_id
+ )
)
- )
elif event_type == "StasisEnd":
logger.info(
@@ -345,6 +383,7 @@ class ARIConnection:
)
ext_channel_id = result.get("id", "")
if ext_channel_id:
+ await self._mark_ext_channel(ext_channel_id)
logger.info(
f"[ARI org={self.organization_id}] Created external media channel: {ext_channel_id}"
)
@@ -374,6 +413,93 @@ class ARIConnection:
)
return bridge_id
+ async def _handle_inbound_stasis_start(
+ self, channel_id: str, channel_state: str, event: dict
+ ):
+ """Handle an inbound call (StasisStart with state=Ring).
+
+ Validates quota, creates a workflow run, then delegates to the
+ standard answer→externalMedia→bridge pipeline.
+ """
+ channel = event.get("channel", {})
+ caller_number = channel.get("caller", {}).get("number", "unknown")
+ called_number = channel.get("dialplan", {}).get("exten", "unknown")
+
+ try:
+ # 1. Check inbound_workflow_id is configured
+ if not self.inbound_workflow_id:
+ logger.warning(
+ f"[ARI org={self.organization_id}] Inbound call on channel {channel_id} "
+ f"but no inbound_workflow_id configured — hanging up"
+ )
+ await self._delete_channel(channel_id)
+ return
+
+ # 2. Load workflow to get user_id and verify organization
+ workflow = await db_client.get_workflow(
+ self.inbound_workflow_id, organization_id=self.organization_id
+ )
+ if not workflow:
+ logger.warning(
+ f"[ARI org={self.organization_id}] Workflow {self.inbound_workflow_id} "
+ f"not found or doesn't belong to this organization — hanging up"
+ )
+ await self._delete_channel(channel_id)
+ return
+
+ user_id = workflow.user_id
+
+ # 3. Check quota
+ quota_result = await check_dograh_quota_by_user_id(user_id)
+ if not quota_result.has_quota:
+ logger.warning(
+ f"[ARI org={self.organization_id}] Quota exceeded for user {user_id} "
+ f"— hanging up inbound call {channel_id}"
+ )
+ await self._delete_channel(channel_id)
+ return
+
+ # 4. Create workflow run
+ call_id = channel_id
+ workflow_run = await db_client.create_workflow_run(
+ name=f"ARI Inbound {caller_number}",
+ workflow_id=self.inbound_workflow_id,
+ mode=WorkflowRunMode.ARI.value,
+ user_id=user_id,
+ call_type=CallType.INBOUND,
+ initial_context={
+ "caller_number": caller_number,
+ "called_number": called_number,
+ "direction": "inbound",
+ "call_id": call_id,
+ "provider": "ari",
+ },
+ )
+
+ logger.info(
+ f"[ARI org={self.organization_id}] Created inbound workflow run "
+ f"{workflow_run.id} for channel {channel_id} "
+ f"(caller={caller_number}, called={called_number})"
+ )
+
+ # 5. Delegate to the standard pipeline
+ await self._handle_stasis_start(
+ channel_id,
+ channel_state,
+ str(workflow_run.id),
+ str(self.inbound_workflow_id),
+ str(user_id),
+ )
+ except Exception as e:
+ logger.error(
+ f"[ARI org={self.organization_id}] Error handling inbound StasisStart "
+ f"for channel {channel_id}: {e}"
+ )
+ try:
+ await self._delete_channel(channel_id)
+ except Exception:
+ pass
+
async def _handle_stasis_start(
self,
channel_id: str,
@@ -588,9 +714,15 @@ class ARIManager:
app_name = config["app_name"]
app_password = config["app_password"]
ws_client_name = config["ws_client_name"]
+ inbound_workflow_id = config.get("inbound_workflow_id")
conn = ARIConnection(
- org_id, ari_endpoint, app_name, app_password, ws_client_name
+ org_id,
+ ari_endpoint,
+ app_name,
+ app_password,
+ ws_client_name,
+ inbound_workflow_id=inbound_workflow_id,
)
key = conn.connection_key
@@ -604,9 +736,12 @@ class ARIManager:
self._connections[key] = conn
await conn.start()
else:
- # Existing configuration - check if password changed
+ # Existing configuration - check if password or inbound_workflow_id changed
existing = self._connections[key]
- if existing.app_password != app_password:
+ if (
+ existing.app_password != app_password
+ or existing.inbound_workflow_id != inbound_workflow_id
+ ):
logger.info(
f"[ARI Manager] Config changed for org {org_id}, reconnecting..."
)
@@ -666,6 +801,7 @@ class ARIManager:
"app_name": app_name,
"app_password": app_password,
"ws_client_name": ws_client_name,
+ "inbound_workflow_id": value.get("inbound_workflow_id"),
}
)
diff --git a/api/services/telephony/factory.py b/api/services/telephony/factory.py
index ac03d6f..0e2bb6c 100644
--- a/api/services/telephony/factory.py
+++ b/api/services/telephony/factory.py
@@ -82,6 +82,7 @@ async def load_telephony_config(organization_id: int) -> Dict[str, Any]:
"ari_endpoint": config.value.get("ari_endpoint"),
"app_name": config.value.get("app_name"),
"app_password": config.value.get("app_password"),
+ "inbound_workflow_id": config.value.get("inbound_workflow_id"),
"from_numbers": config.value.get("from_numbers", []),
}
else:
diff --git a/api/services/telephony/providers/ari_provider.py b/api/services/telephony/providers/ari_provider.py
index 67b449c..139065a 100644
--- a/api/services/telephony/providers/ari_provider.py
+++ b/api/services/telephony/providers/ari_provider.py
@@ -50,6 +50,7 @@ class ARIProvider(TelephonyProvider):
self.ari_endpoint = config.get("ari_endpoint", "").rstrip("/")
self.app_name = config.get("app_name", "")
self.app_password = config.get("app_password", "")
+ self.inbound_workflow_id = config.get("inbound_workflow_id")
self.from_numbers = config.get("from_numbers", [])
if isinstance(self.from_numbers, str):
diff --git a/docs/docs.json b/docs/docs.json
index c4e3a2b..c6e8d42 100644
--- a/docs/docs.json
+++ b/docs/docs.json
@@ -90,6 +90,7 @@
"integrations/telephony/vonage",
"integrations/telephony/cloudonix",
"integrations/telephony/vobiz",
+ "integrations/telephony/asterisk-ari",
"integrations/telephony/webhooks",
"integrations/telephony/custom"
]
diff --git a/docs/integrations/telephony/asterisk-ari.mdx b/docs/integrations/telephony/asterisk-ari.mdx
new file mode 100644
index 0000000..da2c3c7
--- /dev/null
+++ b/docs/integrations/telephony/asterisk-ari.mdx
@@ -0,0 +1,215 @@
+---
+title: "Asterisk ARI Integration"
+description: "Connect Dograh AI to your Asterisk PBX using the Asterisk REST Interface (ARI)"
+---
+
+## Overview
+
+Asterisk ARI (Asterisk REST Interface) allows you to connect Dograh AI voice agents to your existing Asterisk PBX. ARI provides a WebSocket-based event model for controlling calls via Stasis applications, giving Dograh full control over call flow and audio streaming.
+
+This guide focuses on the Dograh-specific configuration. For general Asterisk installation and administration, refer to the [official Asterisk documentation](https://docs.asterisk.org/).
+
+## Prerequisites
+
+Before setting up the ARI integration, ensure you have:
+
+- A running Asterisk instance (version 16 or later recommended)
+- ARI module enabled in Asterisk
+- `chan_websocket` (WebSocket channel driver) enabled in your Asterisk build
+- Network connectivity between your Dograh instance and Asterisk
+- Dograh AI instance running and accessible
+
+
+ Workflow to activate for inbound calls received via ARI +
+