feat: add Tone telephony provider (+91 TRAI-compliant via Exotel)

This commit is contained in:
Ridhith Arora 2026-06-07 15:56:06 +05:30
parent 49e68b49d5
commit 37d6eb8828
8 changed files with 664 additions and 0 deletions

View file

@ -20,6 +20,7 @@ class CallType(Enum):
class WorkflowRunMode(Enum):
ARI = "ari"
PLIVO = "plivo"
TONE = "tone"
TWILIO = "twilio"
VONAGE = "vonage"
VOBIZ = "vobiz"

View file

@ -10,6 +10,7 @@ from api.services.telephony.providers import ( # noqa: F401 -- import for side
ari,
cloudonix,
plivo,
tone,
telnyx,
twilio,
vobiz,

View file

@ -0,0 +1,66 @@
"""Tone telephony provider package."""
from api.services.telephony.registry import (
ProviderSpec,
ProviderUIField,
ProviderUIMetadata,
register,
)
from .config import ToneConfigurationRequest, ToneConfigurationResponse
from .provider import ToneProvider
from .transport import create_transport
def _config_loader(value: dict) -> dict:
return {
"provider": "tone",
"api_key": value.get("api_key"),
"from_numbers": value.get("from_numbers", []),
}
_UI_METADATA = ProviderUIMetadata(
display_name="Tone",
docs_url="https://docs.usetone.ai",
fields=[
ProviderUIField(
name="api_key",
label="API Key",
type="password",
sensitive=True,
description="Your Tone API key from usetone.ai/dashboard/api-keys",
),
ProviderUIField(
name="from_numbers",
label="Phone Numbers",
type="string-array",
description="E.164-formatted Tone phone numbers, e.g. +917314624707",
),
],
)
SPEC = ProviderSpec(
name="tone",
provider_cls=ToneProvider,
config_loader=_config_loader,
transport_factory=create_transport,
transport_sample_rate=8000,
config_request_cls=ToneConfigurationRequest,
ui_metadata=_UI_METADATA,
config_response_cls=ToneConfigurationResponse,
account_id_credential_field="api_key",
)
register(SPEC)
__all__ = [
"SPEC",
"ToneConfigurationRequest",
"ToneConfigurationResponse",
"ToneProvider",
"create_transport",
]

View file

@ -0,0 +1,24 @@
"""Tone telephony configuration schemas."""
from typing import List, Literal, Optional
from pydantic import BaseModel, Field
class ToneConfigurationRequest(BaseModel):
"""Request schema for Tone configuration."""
provider: Literal["tone"] = Field(default="tone")
api_key: str = Field(..., description="Tone API key (Bearer token)")
from_numbers: List[str] = Field(
default_factory=list,
description="E.164-formatted Tone phone numbers, e.g. ['+917314624707']",
)
class ToneConfigurationResponse(BaseModel):
"""Response schema for Tone configuration with masked sensitive fields."""
provider: Literal["tone"] = Field(default="tone")
api_key: str # Masked on return
from_numbers: List[str]

View file

@ -0,0 +1,394 @@
"""
Tone telephony provider implementation.
Tone (usetone.ai) provisions TRAI-compliant +91 numbers for AI agents,
built on Exotel as the underlying carrier.
Tone API: https://api.usetone.ai/v1
Auth: Authorization: Bearer <api_key>
Verified Tone API call fields:
Request: to, from, callType, webhookUrl
Response: id, status, to, from, callType, webhookUrl, createdAt
WebSocket protocol: Exotel Voicebot Applet (bidirectional)
Audio: 8 kHz, 16-bit PCM, base64-encoded
Events: connected start media* stop
The WSS URL is configured statically in Exotel App Bazaar NOT returned
from a webhook. This is different from Twilio/Plivo where TwiML is returned.
"""
import json
import random
from typing import TYPE_CHECKING, Any, Dict, List, Optional
import aiohttp
from fastapi import HTTPException
from loguru import logger
from api.db import db_client
from api.enums import WorkflowRunMode
from api.services.telephony.base import (
CallInitiationResult,
NormalizedInboundData,
ProviderSyncResult,
TelephonyProvider,
)
from api.utils.common import get_backend_endpoints
from api.utils.telephony_address import normalize_telephony_address
if TYPE_CHECKING:
from fastapi import WebSocket
TONE_API_BASE = "https://api.usetone.ai/v1"
class ToneProvider(TelephonyProvider):
"""Tone (usetone.ai) implementation of TelephonyProvider."""
PROVIDER_NAME = WorkflowRunMode.TONE.value
WEBHOOK_ENDPOINT = "tone-webhook"
def __init__(self, config: Dict[str, Any]):
self.api_key: str = config.get("api_key", "")
self.from_numbers: List[str] = config.get("from_numbers", [])
if isinstance(self.from_numbers, str):
self.from_numbers = [self.from_numbers]
# ------------------------------------------------------------------
# Validation
# ------------------------------------------------------------------
def validate_config(self) -> bool:
return bool(self.api_key and self.from_numbers)
# ------------------------------------------------------------------
# Outbound call
# ------------------------------------------------------------------
async def initiate_call(
self,
to_number: str,
webhook_url: str,
workflow_run_id: Optional[int] = None,
from_number: Optional[str] = None,
**kwargs: Any,
) -> CallInitiationResult:
if not self.validate_config():
raise ValueError("Tone provider not properly configured")
_from = from_number or random.choice(self.from_numbers)
# Tone API POST /v1/calls — verified field names
payload: Dict[str, Any] = {
"to": to_number,
"from": _from,
"callType": kwargs.pop("callType", "TRANSACTIONAL"),
"webhookUrl": webhook_url,
}
if workflow_run_id:
backend_endpoint, _ = await get_backend_endpoints()
payload["webhookUrl"] = (
f"{backend_endpoint}/api/v1/telephony/tone-webhook"
f"?workflow_run_id={workflow_run_id}"
)
async with aiohttp.ClientSession() as session:
async with session.post(
f"{TONE_API_BASE}/calls",
json=payload,
headers={"Authorization": f"Bearer {self.api_key}"},
) as response:
response_text = await response.text()
if response.status not in (200, 201, 202):
raise HTTPException(
status_code=response.status,
detail=f"Failed to initiate Tone call: {response_text}",
)
response_data = json.loads(response_text)
call_id = response_data.get("id", "")
if not call_id:
raise HTTPException(
status_code=500,
detail=f"Tone response missing call id: {response_data}",
)
return CallInitiationResult(
call_id=call_id,
status=response_data.get("status", "queued"),
caller_number=_from,
provider_metadata={"call_id": call_id},
raw_response=response_data,
)
# ------------------------------------------------------------------
# Call status
# ------------------------------------------------------------------
async def get_call_status(self, call_id: str) -> Dict[str, Any]:
async with aiohttp.ClientSession() as session:
async with session.get(
f"{TONE_API_BASE}/calls/{call_id}",
headers={"Authorization": f"Bearer {self.api_key}"},
) as response:
if response.status != 200:
error = await response.text()
raise Exception(f"Failed to get Tone call status: {error}")
return await response.json()
# ------------------------------------------------------------------
# Phone numbers
# ------------------------------------------------------------------
async def get_available_phone_numbers(self) -> List[str]:
return self.from_numbers
# ------------------------------------------------------------------
# Webhook response
# (Tone/Exotel doesn't use XML — WSS URL is set in App Bazaar)
# Required by abstract base but not used for this provider.
# ------------------------------------------------------------------
async def get_webhook_response(
self, workflow_id: int, user_id: int, workflow_run_id: int
) -> str:
# Tone does not use TwiML or Plivo XML. The WebSocket URL is configured
# statically in the Exotel App Bazaar Voicebot Applet. This method is
# defined to satisfy the abstract interface but should not be called.
logger.warning(
"[Tone] get_webhook_response called but Tone uses Exotel App Bazaar "
"for WebSocket URL configuration, not webhook XML responses."
)
return ""
# ------------------------------------------------------------------
# Webhook / inbound signature — Exotel has no HMAC
# ------------------------------------------------------------------
async def verify_webhook_signature(
self, url: str, params: Dict[str, Any], signature: str
) -> bool:
# Exotel does not send an HMAC signature on HTTP callbacks.
# Auth is via IP whitelisting or Basic Auth in the callback URL.
# Always return True — harden via IP whitelist in production.
return True
async def verify_inbound_signature(
self,
url: str,
webhook_data: Dict[str, Any],
headers: Dict[str, str],
body: str = "",
) -> bool:
return True
# ------------------------------------------------------------------
# Status callback normalization
# ------------------------------------------------------------------
def parse_status_callback(self, data: Dict[str, Any]) -> Dict[str, Any]:
# Exotel Passthru applet sends form-encoded params
status_map = {
"initiated": "initiated",
"ringing": "ringing",
"in-progress": "answered",
"answered": "answered",
"completed": "completed",
"failed": "failed",
"busy": "busy",
"no-answer": "no-answer",
"canceled": "canceled",
"cancelled": "canceled",
}
raw_status = (data.get("Status") or data.get("status") or "").lower()
return {
"call_id": data.get("CallSid") or data.get("id", ""),
"status": status_map.get(raw_status, raw_status),
"from_number": data.get("From") or data.get("from"),
"to_number": data.get("To") or data.get("to"),
"direction": data.get("Direction"),
"duration": data.get("Duration") or data.get("duration"),
"extra": data,
}
# ------------------------------------------------------------------
# WebSocket audio (Exotel Voicebot Applet)
# ------------------------------------------------------------------
async def handle_websocket(
self,
websocket: "WebSocket",
workflow_id: int,
user_id: int,
workflow_run_id: int,
) -> None:
from api.services.pipecat.run_pipeline import run_pipeline_telephony
# Exotel sends: {"event":"connected"} then {"event":"start", "start":{...}}
stream_sid = None
call_sid = None
for _ in range(2):
raw = await websocket.receive_text()
msg = json.loads(raw)
if msg.get("event") == "start":
start = msg.get("start", {})
stream_sid = start.get("stream_sid") or msg.get("stream_sid")
call_sid = start.get("call_sid")
break
if not stream_sid:
logger.error(f"[Tone] Missing stream_sid in start event for run {workflow_run_id}")
await websocket.close(code=4400, reason="Missing stream_sid")
return
# Resolve call_id from DB context (stored by tone-webhook) or from start event
if not call_sid:
workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
if workflow_run and workflow_run.gathered_context:
call_sid = workflow_run.gathered_context.get("call_id")
logger.info(
f"[Tone] WebSocket connected: stream_sid={stream_sid} "
f"call_sid={call_sid} run={workflow_run_id}"
)
await run_pipeline_telephony(
websocket,
provider_name=self.PROVIDER_NAME,
workflow_id=workflow_id,
workflow_run_id=workflow_run_id,
user_id=user_id,
call_id=call_sid or "",
transport_kwargs={"stream_sid": stream_sid, "call_sid": call_sid},
)
# ------------------------------------------------------------------
# Call cost
# ------------------------------------------------------------------
async def get_call_cost(self, call_id: str) -> Dict[str, Any]:
try:
call_data = await self.get_call_status(call_id)
return {
"cost_usd": float(call_data.get("cost") or 0),
"duration": int(call_data.get("duration") or 0),
"status": call_data.get("status", "unknown"),
"price_unit": "INR", # Tone is India-first
"raw_response": call_data,
}
except Exception as e:
logger.error(f"[Tone] Exception fetching call cost for {call_id}: {e}")
return {"cost_usd": 0.0, "duration": 0, "status": "error", "error": str(e)}
# ------------------------------------------------------------------
# Inbound call support
# ------------------------------------------------------------------
@classmethod
def can_handle_webhook(
cls, webhook_data: Dict[str, Any], headers: Dict[str, str]
) -> bool:
# Exotel Passthru sends CallSid; no provider-specific signature header
return "CallSid" in webhook_data and "call_sid" not in headers
@staticmethod
def parse_inbound_webhook(webhook_data: Dict[str, Any]) -> NormalizedInboundData:
from_raw = webhook_data.get("From", "")
to_raw = webhook_data.get("To", "")
return NormalizedInboundData(
provider=ToneProvider.PROVIDER_NAME,
call_id=webhook_data.get("CallSid", ""),
from_number=normalize_telephony_address(from_raw).canonical if from_raw else "",
to_number=normalize_telephony_address(to_raw).canonical if to_raw else "",
direction=webhook_data.get("Direction", "inbound"),
call_status=webhook_data.get("Status", ""),
account_id=webhook_data.get("AccountSid"),
raw_data=webhook_data,
)
@staticmethod
def validate_account_id(config_data: dict, webhook_account_id: str) -> bool:
# Exotel doesn't send a consistent account_id in all webhooks; allow through
if not webhook_account_id:
return bool(config_data.get("api_key"))
return True
async def start_inbound_stream(
self,
*,
websocket_url: str,
workflow_run_id: int,
normalized_data: NormalizedInboundData,
backend_endpoint: str,
) -> Any:
# Tone/Exotel doesn't receive XML/JSON to start a stream.
# The Voicebot Applet in App Bazaar points directly at the WSS URL.
# Return a plain 200 acknowledgement.
from fastapi.responses import JSONResponse
return JSONResponse(content={"status": "ok"})
# ------------------------------------------------------------------
# Error responses
# ------------------------------------------------------------------
@staticmethod
def generate_error_response(error_type: str, message: str) -> tuple:
from fastapi.responses import JSONResponse
return JSONResponse(
content={"error": error_type, "message": message},
status_code=400,
)
@staticmethod
def generate_validation_error_response(error_type) -> tuple:
from fastapi.responses import JSONResponse
return JSONResponse(
content={"error": str(error_type), "message": "Validation failed"},
status_code=401,
)
# ------------------------------------------------------------------
# Call transfers
# ------------------------------------------------------------------
async def transfer_call(
self,
destination: str,
transfer_id: str,
conference_name: str,
timeout: int = 30,
**kwargs: Any,
) -> Dict[str, Any]:
raise NotImplementedError("Tone provider does not support call transfers")
def supports_transfers(self) -> bool:
return False
# ------------------------------------------------------------------
# Inbound configuration (App Bazaar — manual, not programmable via REST)
# ------------------------------------------------------------------
async def configure_inbound(
self, address: str, webhook_url: Optional[str]
) -> ProviderSyncResult:
# Exotel App Bazaar flows must be configured manually in the Exotel dashboard.
# Tone does not expose a REST API to update the Voicebot Applet WSS URL.
logger.info(
f"[Tone] configure_inbound for {address}: Exotel App Bazaar must be "
f"configured manually. WSS endpoint: {webhook_url or '(cleared)'}"
)
return ProviderSyncResult(
ok=True,
message=(
"Tone uses Exotel App Bazaar for call routing. "
"Update the Voicebot Applet WSS URL manually in your Exotel dashboard."
),
)

View file

@ -0,0 +1,109 @@
"""Tone telephony routes (webhooks, status callbacks).
Mounted under /api/v1/telephony by api.routes.telephony via the
provider registry see ProviderSpec.router.
Exotel sends:
- Passthru applet callbacks: form-urlencoded with CallSid, Status, etc.
- No HMAC signature authentication is via IP whitelisting or Basic Auth
embedded in the callback URL.
"""
import json
from fastapi import APIRouter, Request
from loguru import logger
from pipecat.utils.run_context import set_current_run_id
from starlette.responses import HTMLResponse
from api.db import db_client
from api.services.telephony.factory import get_telephony_provider_for_run
from api.services.telephony.status_processor import (
StatusCallbackRequest,
_process_status_update,
)
router = APIRouter()
async def _handle_tone_status_callback(workflow_run_id: int, request: Request):
set_current_run_id(workflow_run_id)
form_data = await request.form()
callback_data = dict(form_data)
logger.info(
f"[run {workflow_run_id}] Received Tone callback: {json.dumps(callback_data)}"
)
workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
if not workflow_run:
logger.warning(f"Workflow run {workflow_run_id} not found for Tone callback")
return {"status": "ignored", "reason": "workflow_run_not_found"}
workflow = await db_client.get_workflow_by_id(workflow_run.workflow_id)
if not workflow:
logger.warning(f"Workflow {workflow_run.workflow_id} not found")
return {"status": "ignored", "reason": "workflow_not_found"}
provider = await get_telephony_provider_for_run(workflow_run, workflow.organization_id)
# Exotel has no HMAC signature — verify_inbound_signature is a pass-through
parsed_data = provider.parse_status_callback(callback_data)
status_update = StatusCallbackRequest(
call_id=parsed_data["call_id"],
status=parsed_data["status"],
from_number=parsed_data.get("from_number"),
to_number=parsed_data.get("to_number"),
direction=parsed_data.get("direction"),
duration=parsed_data.get("duration"),
extra=parsed_data.get("extra", {}),
)
await _process_status_update(workflow_run_id, status_update)
return {"status": "success"}
@router.post("/tone-webhook", include_in_schema=False)
async def handle_tone_webhook(
workflow_id: int,
user_id: int,
workflow_run_id: int,
organization_id: int,
request: Request,
):
"""
Handle initial webhook from Tone/Exotel when an outbound call is answered.
Tone does not use TwiML or Plivo XML. The WebSocket URL is configured
statically in the Exotel App Bazaar Voicebot Applet. This endpoint is used
as the webhookUrl in POST /v1/calls for status callbacks only.
"""
set_current_run_id(workflow_run_id)
workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
form_data = await request.form()
callback_data = dict(form_data)
# Store Exotel CallSid so handle_websocket can find it
call_id = callback_data.get("CallSid") or callback_data.get("id", "")
if call_id and workflow_run:
gathered_context = dict(workflow_run.gathered_context or {})
gathered_context["call_id"] = call_id
await db_client.update_workflow_run(
run_id=workflow_run_id, gathered_context=gathered_context
)
# No XML response needed — Exotel doesn't parse a webhook response for call control
return {"status": "ok"}
@router.post("/tone/hangup-callback/{workflow_run_id}")
async def handle_tone_hangup_callback(workflow_run_id: int, request: Request):
"""Handle Tone/Exotel Passthru applet hangup callbacks."""
return await _handle_tone_status_callback(workflow_run_id, request)
@router.post("/tone/ring-callback/{workflow_run_id}")
async def handle_tone_ring_callback(workflow_run_id: int, request: Request):
"""Handle Tone/Exotel ring callbacks."""
return await _handle_tone_status_callback(workflow_run_id, request)

View file

@ -0,0 +1,5 @@
"""Exotel frame serializer (re-exported from pipecat)."""
from pipecat.serializers.exotel import ExotelFrameSerializer
__all__ = ["ExotelFrameSerializer"]

View file

@ -0,0 +1,64 @@
"""Tone (Exotel) transport factory."""
from fastapi import WebSocket
from pipecat.transports.websocket.fastapi import (
FastAPIWebsocketParams,
FastAPIWebsocketTransport,
)
from api.services.pipecat.audio_config import AudioConfig
from api.services.pipecat.audio_mixer import build_audio_out_mixer
from api.services.pipecat.transport_params import realtime_param_overrides
from api.services.telephony.factory import load_credentials_for_transport
from .serializers import ExotelFrameSerializer
async def create_transport(
websocket: WebSocket,
workflow_run_id: int,
audio_config: AudioConfig,
organization_id: int,
*,
ambient_noise_config: dict | None = None,
telephony_configuration_id: int | None = None,
is_realtime: bool = False,
stream_sid: str,
call_sid: str | None = None,
):
"""Create a FastAPIWebsocketTransport for a Tone/Exotel call."""
config = await load_credentials_for_transport(
organization_id, telephony_configuration_id, expected_provider="tone"
)
api_key = config.get("api_key")
if not api_key:
raise ValueError(
f"Incomplete Tone configuration for organization {organization_id}"
)
serializer = ExotelFrameSerializer(
stream_sid=stream_sid,
call_sid=call_sid,
params=ExotelFrameSerializer.InputParams(
exotel_sample_rate=8000,
sample_rate=audio_config.pipeline_sample_rate,
),
)
mixer = await build_audio_out_mixer(
audio_config.transport_out_sample_rate, ambient_noise_config
)
return FastAPIWebsocketTransport(
websocket=websocket,
params=FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
audio_in_sample_rate=audio_config.transport_in_sample_rate,
audio_out_sample_rate=audio_config.transport_out_sample_rate,
audio_out_mixer=mixer,
serializer=serializer,
**realtime_param_overrides(is_realtime),
),
)