feat: handling inbound calls

This commit is contained in:
Abhishek Kumar 2026-02-16 13:01:10 +05:30
parent 1821872f7a
commit 4bcf10bfae
10 changed files with 417 additions and 26 deletions

View file

@ -135,6 +135,8 @@ async def get_telephony_configuration(user: UserModel = Depends(get_user)):
ws_client_name = config.value.get("ws_client_name", "")
from_numbers = config.value.get("from_numbers", [])
inbound_workflow_id = config.value.get("inbound_workflow_id")
return TelephonyConfigurationResponse(
ari=ARIConfigurationResponse(
provider="ari",
@ -142,6 +144,7 @@ async def get_telephony_configuration(user: UserModel = Depends(get_user)):
app_name=app_name,
app_password=mask_key(app_password) if app_password else "",
ws_client_name=ws_client_name,
inbound_workflow_id=inbound_workflow_id,
from_numbers=from_numbers,
),
)
@ -208,6 +211,7 @@ async def save_telephony_configuration(
"app_name": request.app_name,
"app_password": request.app_password,
"ws_client_name": request.ws_client_name,
"inbound_workflow_id": request.inbound_workflow_id,
"from_numbers": request.from_numbers,
}
else:

View file

@ -104,6 +104,9 @@ class ARIConfigurationRequest(BaseModel):
default="",
description="websocket_client.conf connection name for externalMedia (e.g., dograh_staging)",
)
inbound_workflow_id: Optional[int] = Field(
default=None, description="Workflow ID for inbound calls"
)
from_numbers: List[str] = Field(
default_factory=list,
description="List of SIP extensions/numbers for outbound calls (optional)",
@ -118,6 +121,7 @@ class ARIConfigurationResponse(BaseModel):
app_name: str
app_password: str # Masked
ws_client_name: str = ""
inbound_workflow_id: Optional[int] = None
from_numbers: List[str]

View file

@ -24,10 +24,12 @@ from loguru import logger
from api.constants import REDIS_URL
from api.db import db_client
from api.enums import OrganizationConfigurationKey
from api.enums import CallType, OrganizationConfigurationKey, WorkflowRunMode
from api.services.quota_service import check_dograh_quota_by_user_id
# Redis key pattern and TTL for channel-to-run mapping
_CHANNEL_KEY_PREFIX = "ari:channel:"
_EXT_CHANNEL_KEY_PREFIX = "ari:ext_channel:"
_CHANNEL_KEY_TTL = 3600 # 1 hour safety expiry
@ -41,12 +43,14 @@ class ARIConnection:
app_name: str,
app_password: str,
ws_client_name: str = "",
inbound_workflow_id: int = None,
):
self.organization_id = organization_id
self.ari_endpoint = ari_endpoint.rstrip("/")
self.app_name = app_name
self.app_password = app_password
self.ws_client_name = ws_client_name
self.inbound_workflow_id = inbound_workflow_id
self._ws: Optional[websockets.ClientConnection] = None
self._task: Optional[asyncio.Task] = None
@ -88,6 +92,23 @@ class ARIConnection:
keys = [f"{_CHANNEL_KEY_PREFIX}{cid}" for cid in channel_ids]
await r.delete(*keys)
async def _mark_ext_channel(self, channel_id: str):
"""Mark a channel as an external media channel we created."""
r = await self._get_redis()
await r.set(
f"{_EXT_CHANNEL_KEY_PREFIX}{channel_id}", "1", ex=_CHANNEL_KEY_TTL
)
async def _is_ext_channel(self, channel_id: str) -> bool:
"""Check if a channel is an external media channel we created."""
r = await self._get_redis()
return await r.exists(f"{_EXT_CHANNEL_KEY_PREFIX}{channel_id}") > 0
async def _delete_ext_channel(self, channel_id: str):
"""Remove the external media channel marker."""
r = await self._get_redis()
await r.delete(f"{_EXT_CHANNEL_KEY_PREFIX}{channel_id}")
@property
def ws_url(self) -> str:
"""Build the ARI WebSocket URL."""
@ -211,6 +232,16 @@ class ARIConnection:
channel_state = channel.get("state", "unknown")
if event_type == "StasisStart":
# Skip external media channels we created — they fire
# their own StasisStart but need no further handling.
if await self._is_ext_channel(channel_id):
logger.debug(
f"[ARI org={self.organization_id}] StasisStart for our "
f"externalMedia channel {channel_id}, ignoring"
)
await self._delete_ext_channel(channel_id)
return
app_args = event.get("args", [])
caller = channel.get("caller", {})
logger.info(
@ -220,31 +251,38 @@ class ARIConnection:
f"args={app_args}"
)
# Parse args to extract workflow context
args_dict = {}
for arg in app_args:
for pair in arg.split(","):
if "=" in pair:
key, value = pair.split("=", 1)
args_dict[key.strip()] = value.strip()
workflow_run_id = args_dict.get("workflow_run_id")
workflow_id = args_dict.get("workflow_id")
user_id = args_dict.get("user_id")
if not workflow_run_id or not workflow_id or not user_id:
logger.warning(
f"[ARI org={self.organization_id}] StasisStart missing required args: "
f"workflow_run_id={workflow_run_id}, workflow_id={workflow_id}, user_id={user_id}"
if channel_state == "Ring":
# Inbound call — arrived from outside, not yet answered
asyncio.create_task(
self._handle_inbound_stasis_start(channel_id, channel_state, event)
)
return
else:
# Outbound call (state == "Up") — originated by us
# Parse args to extract workflow context
args_dict = {}
for arg in app_args:
for pair in arg.split(","):
if "=" in pair:
key, value = pair.split("=", 1)
args_dict[key.strip()] = value.strip()
# Start pipeline connection in background task
asyncio.create_task(
self._handle_stasis_start(
channel_id, channel_state, workflow_run_id, workflow_id, user_id
workflow_run_id = args_dict.get("workflow_run_id")
workflow_id = args_dict.get("workflow_id")
user_id = args_dict.get("user_id")
if not workflow_run_id or not workflow_id or not user_id:
logger.warning(
f"[ARI org={self.organization_id}] StasisStart missing required args: "
f"workflow_run_id={workflow_run_id}, workflow_id={workflow_id}, user_id={user_id}"
)
return
# Start pipeline connection in background task
asyncio.create_task(
self._handle_stasis_start(
channel_id, channel_state, workflow_run_id, workflow_id, user_id
)
)
)
elif event_type == "StasisEnd":
logger.info(
@ -345,6 +383,7 @@ class ARIConnection:
)
ext_channel_id = result.get("id", "")
if ext_channel_id:
await self._mark_ext_channel(ext_channel_id)
logger.info(
f"[ARI org={self.organization_id}] Created external media channel: {ext_channel_id}"
)
@ -374,6 +413,93 @@ class ARIConnection:
)
return bridge_id
async def _handle_inbound_stasis_start(
self, channel_id: str, channel_state: str, event: dict
):
"""Handle an inbound call (StasisStart with state=Ring).
Validates quota, creates a workflow run, then delegates to the
standard answerexternalMediabridge pipeline.
"""
channel = event.get("channel", {})
caller_number = channel.get("caller", {}).get("number", "unknown")
called_number = channel.get("dialplan", {}).get("exten", "unknown")
try:
# 1. Check inbound_workflow_id is configured
if not self.inbound_workflow_id:
logger.warning(
f"[ARI org={self.organization_id}] Inbound call on channel {channel_id} "
f"but no inbound_workflow_id configured — hanging up"
)
await self._delete_channel(channel_id)
return
# 2. Load workflow to get user_id and verify organization
workflow = await db_client.get_workflow(
self.inbound_workflow_id, organization_id=self.organization_id
)
if not workflow:
logger.warning(
f"[ARI org={self.organization_id}] Workflow {self.inbound_workflow_id} "
f"not found or doesn't belong to this organization — hanging up"
)
await self._delete_channel(channel_id)
return
user_id = workflow.user_id
# 3. Check quota
quota_result = await check_dograh_quota_by_user_id(user_id)
if not quota_result.has_quota:
logger.warning(
f"[ARI org={self.organization_id}] Quota exceeded for user {user_id} "
f"— hanging up inbound call {channel_id}"
)
await self._delete_channel(channel_id)
return
# 4. Create workflow run
call_id = channel_id
workflow_run = await db_client.create_workflow_run(
name=f"ARI Inbound {caller_number}",
workflow_id=self.inbound_workflow_id,
mode=WorkflowRunMode.ARI.value,
user_id=user_id,
call_type=CallType.INBOUND,
initial_context={
"caller_number": caller_number,
"called_number": called_number,
"direction": "inbound",
"call_id": call_id,
"provider": "ari",
},
)
logger.info(
f"[ARI org={self.organization_id}] Created inbound workflow run "
f"{workflow_run.id} for channel {channel_id} "
f"(caller={caller_number}, called={called_number})"
)
# 5. Delegate to the standard pipeline
await self._handle_stasis_start(
channel_id,
channel_state,
str(workflow_run.id),
str(self.inbound_workflow_id),
str(user_id),
)
except Exception as e:
logger.error(
f"[ARI org={self.organization_id}] Error handling inbound StasisStart "
f"for channel {channel_id}: {e}"
)
try:
await self._delete_channel(channel_id)
except Exception:
pass
async def _handle_stasis_start(
self,
channel_id: str,
@ -588,9 +714,15 @@ class ARIManager:
app_name = config["app_name"]
app_password = config["app_password"]
ws_client_name = config["ws_client_name"]
inbound_workflow_id = config.get("inbound_workflow_id")
conn = ARIConnection(
org_id, ari_endpoint, app_name, app_password, ws_client_name
org_id,
ari_endpoint,
app_name,
app_password,
ws_client_name,
inbound_workflow_id=inbound_workflow_id,
)
key = conn.connection_key
@ -604,9 +736,12 @@ class ARIManager:
self._connections[key] = conn
await conn.start()
else:
# Existing configuration - check if password changed
# Existing configuration - check if password or inbound_workflow_id changed
existing = self._connections[key]
if existing.app_password != app_password:
if (
existing.app_password != app_password
or existing.inbound_workflow_id != inbound_workflow_id
):
logger.info(
f"[ARI Manager] Config changed for org {org_id}, reconnecting..."
)
@ -666,6 +801,7 @@ class ARIManager:
"app_name": app_name,
"app_password": app_password,
"ws_client_name": ws_client_name,
"inbound_workflow_id": value.get("inbound_workflow_id"),
}
)

View file

@ -82,6 +82,7 @@ async def load_telephony_config(organization_id: int) -> Dict[str, Any]:
"ari_endpoint": config.value.get("ari_endpoint"),
"app_name": config.value.get("app_name"),
"app_password": config.value.get("app_password"),
"inbound_workflow_id": config.value.get("inbound_workflow_id"),
"from_numbers": config.value.get("from_numbers", []),
}
else:

View file

@ -50,6 +50,7 @@ class ARIProvider(TelephonyProvider):
self.ari_endpoint = config.get("ari_endpoint", "").rstrip("/")
self.app_name = config.get("app_name", "")
self.app_password = config.get("app_password", "")
self.inbound_workflow_id = config.get("inbound_workflow_id")
self.from_numbers = config.get("from_numbers", [])
if isinstance(self.from_numbers, str):