dograh/api/routes/agent_stream.py
Abhishek 7fd3b96470
feat: agent stream for cloudonix OPBX (#261)
* feat: agent stream for cloudonix OPBX

* feat: make cloudonix app name optional

* feat: create application while configuring telephony config

* fix: get telephony configuration from stamped workflow run

* fix: fix vobiz hangup URL
2026-05-02 15:53:58 +05:30

138 lines
4.9 KiB
Python

"""Agent-stream WebSocket endpoint.
A single ``/agent-stream/{workflow_uuid}`` socket where a caller can drive
an agent run by passing everything inline in the query string — including
provider credentials. The standard ``/telephony/ws/...`` path requires a
``TelephonyConfigurationModel`` row stored in the org; this one does not.
Auth: the workflow UUID itself acts as the identifier — no API key.
Routing: when ``?provider=<registered>`` matches a telephony provider, we
dispatch to that provider's ``handle_external_websocket``. The raw-audio
branch (no provider) is reserved for a future protocol decision and
currently rejects with 1011.
"""
import uuid
from typing import Optional
from fastapi import APIRouter, WebSocket
from loguru import logger
from starlette.websockets import WebSocketDisconnect
from api.db import db_client
from api.enums import CallType, WorkflowRunState
from api.services.quota_service import check_dograh_quota_by_user_id
from api.services.telephony import registry as telephony_registry
from pipecat.utils.run_context import set_current_org_id, set_current_run_id
router = APIRouter(prefix="/agent-stream")
@router.websocket("/{workflow_uuid}")
async def agent_stream_websocket(
websocket: WebSocket,
workflow_uuid: str,
):
"""Generic agent-stream WebSocket.
Query params:
provider: registered telephony provider name (e.g. ``cloudonix``)
from / to / callId: call metadata persisted on the workflow run
...: provider-specific credentials/identifiers (e.g. ``session``,
``AccountSid``, ``CallSid`` for cloudonix)
Without ``provider`` the raw-audio branch is currently not implemented.
"""
await websocket.accept()
params = dict(websocket.query_params)
provider_name: Optional[str] = params.get("provider")
if not provider_name:
logger.warning(
f"agent-stream raw audio branch not yet supported "
f"(workflow_uuid={workflow_uuid})"
)
await websocket.close(code=1011, reason="Raw audio stream not yet implemented")
return
spec = telephony_registry.get_optional(provider_name)
if spec is None:
logger.warning(f"agent-stream unknown provider: {provider_name}")
await websocket.close(code=1008, reason=f"Unknown provider: {provider_name}")
return
workflow = await db_client.get_workflow_by_uuid_unscoped(workflow_uuid)
if not workflow:
logger.warning(f"agent-stream workflow {workflow_uuid} not found")
await websocket.close(code=1008, reason="Workflow not found")
return
quota_result = await check_dograh_quota_by_user_id(
workflow.user_id, workflow_id=workflow.id
)
if not quota_result.has_quota:
logger.warning(
f"agent-stream quota exceeded for user {workflow.user_id}: "
f"{quota_result.error_message}"
)
await websocket.close(
code=1008, reason=quota_result.error_message or "Quota exceeded"
)
return
numeric_suffix = int(str(uuid.uuid4()).replace("-", "")[:8], 16) % 100000000
workflow_run_name = f"WR-AGS-{numeric_suffix:08d}"
call_id = params.get("callId") or params.get("CallSid")
initial_context = {
**(workflow.template_context_variables or {}),
"provider": provider_name,
"caller_number": params.get("from"),
"called_number": params.get("to"),
"direction": "inbound",
}
workflow_run = await db_client.create_workflow_run(
workflow_run_name,
workflow.id,
provider_name,
user_id=workflow.user_id,
call_type=CallType.INBOUND,
initial_context=initial_context,
gathered_context={"call_id": call_id} if call_id else {},
logs={
"inbound_webhook": {
"domain": params.get("Domain"),
},
},
)
set_current_run_id(workflow_run.id)
set_current_org_id(workflow.organization_id)
await db_client.update_workflow_run(
run_id=workflow_run.id, state=WorkflowRunState.RUNNING.value
)
provider_instance = spec.provider_cls({})
try:
await provider_instance.handle_external_websocket(
websocket,
organization_id=workflow.organization_id,
workflow_id=workflow.id,
user_id=workflow.user_id,
workflow_run_id=workflow_run.id,
params=params,
)
except NotImplementedError as e:
logger.warning(f"agent-stream provider {provider_name} not supported: {e}")
try:
await websocket.close(code=1011, reason=str(e))
except RuntimeError:
pass
except WebSocketDisconnect as e:
logger.info(f"agent-stream disconnected: code={e.code} reason={e.reason}")
except Exception as e:
logger.error(f"agent-stream error for run {workflow_run.id}: {e}")
try:
await websocket.close(1011, "Internal server error")
except RuntimeError:
pass