mirror of
https://github.com/dograh-hq/dograh.git
synced 2026-06-07 07:55:16 +02:00
feat: add vonage telephony
This commit is contained in:
parent
a01f2df7ea
commit
26a9ae2381
35 changed files with 2031 additions and 539 deletions
|
|
@ -34,13 +34,7 @@ STACK_SECRET_SERVER_KEY="ssk_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
|||
STACK_PUBLISHABLE_CLIENT_KEY="pck_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||
|
||||
# Telephony Configuration
|
||||
# Provider selection (default: twilio, future options: vonage, plivo, etc.)
|
||||
TELEPHONY_PROVIDER=twilio
|
||||
|
||||
# Twilio Configuration (when TELEPHONY_PROVIDER=twilio)
|
||||
TWILIO_ACCOUNT_SID="ACxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||
TWILIO_AUTH_TOKEN="xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||
TWILIO_FROM_NUMBER="+1234567890"
|
||||
# Telephony providers are configured via UI/database only. Navigate to: Settings → Integrations → Telephony
|
||||
|
||||
# Tracing and Analytics
|
||||
ENABLE_TRACING=true
|
||||
|
|
|
|||
|
|
@ -21,10 +21,11 @@ RUN pip install --user --no-cache-dir -r requirements.txt && \
|
|||
# Force reinstall of pipecat on every build (cache bust)
|
||||
|
||||
ARG CACHEBUST=1
|
||||
RUN pip install --user 'git+https://github.com/dograh-hq/pipecat.git@f88c8a0#egg=pipecat-ai[cartesia,deepgram,openai,elevenlabs,groq,google,azure,soundfile,silero,webrtc]' && \
|
||||
RUN pip install --user 'git+https://github.com/dograh-hq/pipecat.git@278248a#egg=pipecat-ai[cartesia,deepgram,openai,elevenlabs,groq,google,azure,soundfile,silero,webrtc]' && \
|
||||
# Clean up pip cache after pipecat installation
|
||||
rm -rf /root/.cache/pip
|
||||
|
||||
|
||||
# Remove unnecessary Python cache files from installed packages
|
||||
RUN find /root/.local -type f -name '*.pyc' -delete && \
|
||||
find /root/.local -type d -name '__pycache__' -delete && \
|
||||
|
|
|
|||
|
|
@ -0,0 +1,122 @@
|
|||
"""add_provider_info_to_cost_info
|
||||
|
||||
Revision ID: a57d25b75117
|
||||
Revises: 982ec8e434be
|
||||
Create Date: 2025-10-21 12:28:06.053318
|
||||
|
||||
"""
|
||||
from typing import Sequence, Union
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from alembic_postgresql_enum import TableReference
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = 'a57d25b75117'
|
||||
down_revision: Union[str, None] = '982ec8e434be'
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
"""
|
||||
Add provider info to existing cost_info JSON for backward compatibility.
|
||||
This migration:
|
||||
1. Adds 'vonage' to workflow_run_mode enum
|
||||
2. Adds 'provider' field to cost_info for existing records
|
||||
3. Migrates TWILIO_CONFIGURATION key to TELEPHONY_CONFIGURATION
|
||||
"""
|
||||
|
||||
# Add 'vonage' to the workflow_run_mode enum using sync_enum_values like other migrations
|
||||
op.sync_enum_values(
|
||||
enum_schema="public",
|
||||
enum_name="workflow_run_mode",
|
||||
new_values=["twilio", "stasis", "webrtc", "smallwebrtc", "VOICE", "CHAT", "vonage"],
|
||||
affected_columns=[
|
||||
TableReference(
|
||||
table_schema="public", table_name="workflow_runs", column_name="mode"
|
||||
)
|
||||
],
|
||||
enum_values_to_rename=[],
|
||||
)
|
||||
|
||||
# Update workflow_runs to add provider info based on mode
|
||||
# Use jsonb_set() to add provider field while preserving existing data
|
||||
op.execute("""
|
||||
UPDATE workflow_runs
|
||||
SET cost_info = jsonb_set(
|
||||
CASE
|
||||
WHEN cost_info IS NULL OR cost_info::text = '{}'
|
||||
THEN '{}'::jsonb
|
||||
ELSE cost_info::jsonb
|
||||
END,
|
||||
'{provider}',
|
||||
'"twilio"'::jsonb,
|
||||
true
|
||||
)::json
|
||||
WHERE mode = 'twilio'
|
||||
AND (cost_info IS NULL OR cost_info::text NOT LIKE '%provider%')
|
||||
""")
|
||||
|
||||
op.execute("""
|
||||
UPDATE workflow_runs
|
||||
SET cost_info = jsonb_set(
|
||||
CASE
|
||||
WHEN cost_info IS NULL OR cost_info::text = '{}'
|
||||
THEN '{}'::jsonb
|
||||
ELSE cost_info::jsonb
|
||||
END,
|
||||
'{provider}',
|
||||
'"vonage"'::jsonb,
|
||||
true
|
||||
)::json
|
||||
WHERE mode = 'vonage'
|
||||
AND (cost_info IS NULL OR cost_info::text NOT LIKE '%provider%')
|
||||
""")
|
||||
|
||||
# Simply rename the key from TWILIO_CONFIGURATION to TELEPHONY_CONFIGURATION
|
||||
# Keep the same single-provider format
|
||||
op.execute("""
|
||||
UPDATE organization_configurations
|
||||
SET key = 'TELEPHONY_CONFIGURATION'
|
||||
WHERE key = 'TWILIO_CONFIGURATION';
|
||||
""")
|
||||
|
||||
print("Migration complete: Added vonage to enum, provider info to cost_info, and renamed configuration key")
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
"""
|
||||
Remove provider info and revert key name.
|
||||
Revert enum to previous state (removing 'vonage').
|
||||
"""
|
||||
|
||||
# Remove provider field from cost_info while preserving other data
|
||||
op.execute("""
|
||||
UPDATE workflow_runs
|
||||
SET cost_info = (cost_info::jsonb - 'provider')::json
|
||||
WHERE cost_info::text LIKE '%provider%'
|
||||
""")
|
||||
|
||||
# Revert key name
|
||||
op.execute("""
|
||||
UPDATE organization_configurations
|
||||
SET key = 'TWILIO_CONFIGURATION'
|
||||
WHERE key = 'TELEPHONY_CONFIGURATION';
|
||||
""")
|
||||
|
||||
# Revert enum to previous state
|
||||
op.sync_enum_values(
|
||||
enum_schema="public",
|
||||
enum_name="workflow_run_mode",
|
||||
new_values=["twilio", "stasis", "webrtc", "smallwebrtc", "VOICE", "CHAT"],
|
||||
affected_columns=[
|
||||
TableReference(
|
||||
table_schema="public", table_name="workflow_runs", column_name="mode"
|
||||
)
|
||||
],
|
||||
enum_values_to_rename=[],
|
||||
)
|
||||
|
||||
print("Downgrade complete: Removed provider info and reverted key name")
|
||||
|
|
@ -16,9 +16,6 @@ ENABLE_TRACING = os.getenv("ENABLE_TRACING", "false").lower() == "true"
|
|||
ENABLE_RNNOISE = os.getenv("ENABLE_RNNOISE", "false").lower() == "true"
|
||||
|
||||
BACKEND_API_ENDPOINT = os.getenv("BACKEND_API_ENDPOINT", None)
|
||||
TWILIO_ACCOUNT_SID = os.getenv("TWILIO_ACCOUNT_SID", None)
|
||||
TWILIO_AUTH_TOKEN = os.getenv("TWILIO_AUTH_TOKEN", None)
|
||||
TWILIO_DEFAULT_FROM_NUMBER = os.getenv("TWILIO_FROM_NUMBER", None)
|
||||
|
||||
DATABASE_URL = os.environ["DATABASE_URL"]
|
||||
REDIS_URL = os.environ["REDIS_URL"]
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ class Environment(Enum):
|
|||
|
||||
class WorkflowRunMode(Enum):
|
||||
TWILIO = "twilio"
|
||||
VONAGE = "vonage"
|
||||
STASIS = "stasis"
|
||||
WEBRTC = "webrtc"
|
||||
SMALLWEBRTC = "smallwebrtc"
|
||||
|
|
@ -62,7 +63,8 @@ class OrganizationConfigurationKey(Enum):
|
|||
DISPOSITION_CODE_MAPPING = "DISPOSITION_CODE_MAPPING"
|
||||
DISPOSITION_MESSAGE_TEMPLATE = "DISPOSITION_MESSAGE_TEMPLATE"
|
||||
CONCURRENT_CALL_LIMIT = "CONCURRENT_CALL_LIMIT"
|
||||
TWILIO_CONFIGURATION = "TWILIO_CONFIGURATION" # TODO: Rename to TELEPHONY_CONFIGURATION
|
||||
TELEPHONY_CONFIGURATION = "TELEPHONY_CONFIGURATION" # Stores all providers + active one
|
||||
TWILIO_CONFIGURATION = "TWILIO_CONFIGURATION" # Deprecated - for backward compatibility
|
||||
|
||||
|
||||
class WorkflowStatus(Enum):
|
||||
|
|
|
|||
|
|
@ -170,10 +170,10 @@ async def start_campaign(
|
|||
user: UserModel = Depends(get_user),
|
||||
) -> CampaignResponse:
|
||||
"""Start campaign execution"""
|
||||
# Check if organization has TWILIO_CONFIGURATION configured
|
||||
# Check if organization has TELEPHONY_CONFIGURATION configured
|
||||
twilio_config = await db_client.get_configuration(
|
||||
user.selected_organization_id,
|
||||
OrganizationConfigurationKey.TWILIO_CONFIGURATION.value,
|
||||
OrganizationConfigurationKey.TELEPHONY_CONFIGURATION.value,
|
||||
)
|
||||
|
||||
if not twilio_config or not twilio_config.value:
|
||||
|
|
@ -278,10 +278,10 @@ async def resume_campaign(
|
|||
user: UserModel = Depends(get_user),
|
||||
) -> CampaignResponse:
|
||||
"""Resume a paused campaign"""
|
||||
# Check if organization has TWILIO_CONFIGURATION configured
|
||||
# Check if organization has TELEPHONY_CONFIGURATION configured
|
||||
twilio_config = await db_client.get_configuration(
|
||||
user.selected_organization_id,
|
||||
OrganizationConfigurationKey.TWILIO_CONFIGURATION.value,
|
||||
OrganizationConfigurationKey.TELEPHONY_CONFIGURATION.value,
|
||||
)
|
||||
|
||||
if not twilio_config or not twilio_config.value:
|
||||
|
|
|
|||
|
|
@ -1,12 +1,16 @@
|
|||
from fastapi import APIRouter, Depends, HTTPException
|
||||
from loguru import logger
|
||||
|
||||
from api.db import db_client
|
||||
from api.db.models import UserModel
|
||||
from api.enums import OrganizationConfigurationKey
|
||||
from typing import Optional, Union
|
||||
from api.schemas.telephony_config import (
|
||||
TelephonyConfigurationResponse,
|
||||
TwilioConfigurationRequest,
|
||||
TwilioConfigurationResponse,
|
||||
VonageConfigurationRequest,
|
||||
VonageConfigurationResponse,
|
||||
)
|
||||
from api.services.auth.depends import get_user
|
||||
from api.services.configuration.masking import is_mask_of, mask_key
|
||||
|
|
@ -16,36 +20,83 @@ router = APIRouter(prefix="/organizations", tags=["organizations"])
|
|||
|
||||
# TODO: Make endpoints provider-agnostic
|
||||
@router.get("/telephony-config", response_model=TelephonyConfigurationResponse)
|
||||
async def get_telephony_configuration(user: UserModel = Depends(get_user)):
|
||||
"""Get telephony configuration for the user's organization with masked sensitive fields."""
|
||||
async def get_telephony_configuration(
|
||||
user: UserModel = Depends(get_user),
|
||||
provider: Optional[str] = None # Query param to filter by provider
|
||||
):
|
||||
"""Get telephony configuration for the user's organization with masked sensitive fields.
|
||||
|
||||
Args:
|
||||
provider: Optional provider filter ('twilio' or 'vonage').
|
||||
If specified, only returns config if it matches the stored provider.
|
||||
"""
|
||||
if not user.selected_organization_id:
|
||||
raise HTTPException(status_code=400, detail="No organization selected")
|
||||
|
||||
# Try new key first, fallback to old for backward compatibility
|
||||
config = await db_client.get_configuration(
|
||||
user.selected_organization_id,
|
||||
OrganizationConfigurationKey.TWILIO_CONFIGURATION.value, # TODO: Use TELEPHONY_CONFIGURATION
|
||||
OrganizationConfigurationKey.TELEPHONY_CONFIGURATION.value,
|
||||
)
|
||||
|
||||
# TODO: Remove after telephony provider db migration is complete
|
||||
if not config:
|
||||
config = await db_client.get_configuration(
|
||||
user.selected_organization_id,
|
||||
OrganizationConfigurationKey.TWILIO_CONFIGURATION.value,
|
||||
)
|
||||
|
||||
if not config or not config.value:
|
||||
return TelephonyConfigurationResponse(twilio=None)
|
||||
return TelephonyConfigurationResponse(twilio=None, vonage=None)
|
||||
|
||||
# Mask sensitive fields (account_sid and auth_token) before returning
|
||||
account_sid = config.value.get("account_sid", "")
|
||||
auth_token = config.value.get("auth_token", "")
|
||||
# Simple single-provider format
|
||||
stored_provider = config.value.get("provider", "twilio")
|
||||
|
||||
# If a specific provider is requested, only return config if it matches
|
||||
if provider and provider != stored_provider:
|
||||
# User is requesting a different provider than what's stored
|
||||
return TelephonyConfigurationResponse(twilio=None, vonage=None)
|
||||
|
||||
if stored_provider == "twilio":
|
||||
# Mask sensitive fields (account_sid and auth_token) before returning
|
||||
account_sid = config.value.get("account_sid", "")
|
||||
auth_token = config.value.get("auth_token", "")
|
||||
|
||||
return TelephonyConfigurationResponse(
|
||||
twilio=TwilioConfigurationResponse(
|
||||
provider="twilio",
|
||||
account_sid=mask_key(account_sid) if account_sid else "",
|
||||
auth_token=mask_key(auth_token) if auth_token else "",
|
||||
from_numbers=config.value.get("from_numbers", []),
|
||||
return TelephonyConfigurationResponse(
|
||||
twilio=TwilioConfigurationResponse(
|
||||
provider="twilio",
|
||||
account_sid=mask_key(account_sid) if account_sid else "",
|
||||
auth_token=mask_key(auth_token) if auth_token else "",
|
||||
from_numbers=config.value.get("from_numbers", []),
|
||||
),
|
||||
vonage=None
|
||||
)
|
||||
)
|
||||
elif stored_provider == "vonage":
|
||||
# Mask sensitive fields for Vonage
|
||||
application_id = config.value.get("application_id", "")
|
||||
private_key = config.value.get("private_key", "")
|
||||
api_key = config.value.get("api_key", "")
|
||||
api_secret = config.value.get("api_secret", "")
|
||||
|
||||
return TelephonyConfigurationResponse(
|
||||
twilio=None,
|
||||
vonage=VonageConfigurationResponse(
|
||||
provider="vonage",
|
||||
application_id=application_id, # Not masked, not sensitive
|
||||
private_key=mask_key(private_key) if private_key else "",
|
||||
api_key=mask_key(api_key) if api_key else None,
|
||||
api_secret=mask_key(api_secret) if api_secret else None,
|
||||
from_numbers=config.value.get("from_numbers", []),
|
||||
)
|
||||
)
|
||||
else:
|
||||
return TelephonyConfigurationResponse(twilio=None, vonage=None)
|
||||
|
||||
|
||||
@router.post("/telephony-config")
|
||||
async def save_telephony_configuration(
|
||||
request: TwilioConfigurationRequest, user: UserModel = Depends(get_user)
|
||||
request: Union[TwilioConfigurationRequest, VonageConfigurationRequest],
|
||||
user: UserModel = Depends(get_user)
|
||||
):
|
||||
"""Save telephony configuration for the user's organization."""
|
||||
if not user.selected_organization_id:
|
||||
|
|
@ -54,33 +105,73 @@ async def save_telephony_configuration(
|
|||
# Fetch existing configuration to handle masked values
|
||||
existing_config = await db_client.get_configuration(
|
||||
user.selected_organization_id,
|
||||
OrganizationConfigurationKey.TWILIO_CONFIGURATION.value, # TODO: Use TELEPHONY_CONFIGURATION
|
||||
OrganizationConfigurationKey.TELEPHONY_CONFIGURATION.value,
|
||||
)
|
||||
if not existing_config:
|
||||
# Check old key for backward compatibility
|
||||
existing_config = await db_client.get_configuration(
|
||||
user.selected_organization_id,
|
||||
OrganizationConfigurationKey.TWILIO_CONFIGURATION.value,
|
||||
)
|
||||
|
||||
# Build new configuration
|
||||
config_value = {
|
||||
"provider": request.provider,
|
||||
"account_sid": request.account_sid,
|
||||
"auth_token": request.auth_token,
|
||||
"from_numbers": request.from_numbers,
|
||||
}
|
||||
# Build simple single-provider configuration
|
||||
if request.provider == "twilio":
|
||||
config_value = {
|
||||
"provider": "twilio",
|
||||
"account_sid": request.account_sid,
|
||||
"auth_token": request.auth_token,
|
||||
"from_numbers": request.from_numbers,
|
||||
}
|
||||
elif request.provider == "vonage":
|
||||
config_value = {
|
||||
"provider": "vonage",
|
||||
"application_id": request.application_id,
|
||||
"private_key": request.private_key,
|
||||
"api_key": getattr(request, 'api_key', None),
|
||||
"api_secret": getattr(request, 'api_secret', None),
|
||||
"from_numbers": request.from_numbers,
|
||||
}
|
||||
else:
|
||||
raise HTTPException(status_code=400, detail=f"Unsupported provider: {request.provider}")
|
||||
|
||||
# If incoming values are masked (same as stored masked value), keep the original
|
||||
# Handle masked values - only if same provider
|
||||
if existing_config and existing_config.value:
|
||||
# Check if account_sid is unchanged (masked value matches)
|
||||
stored_account_sid = existing_config.value.get("account_sid", "")
|
||||
if stored_account_sid and is_mask_of(request.account_sid, stored_account_sid):
|
||||
config_value["account_sid"] = stored_account_sid # Keep original
|
||||
|
||||
# Check if auth_token is unchanged (masked value matches)
|
||||
stored_auth_token = existing_config.value.get("auth_token", "")
|
||||
if stored_auth_token and is_mask_of(request.auth_token, stored_auth_token):
|
||||
config_value["auth_token"] = stored_auth_token # Keep original
|
||||
existing_provider = existing_config.value.get("provider")
|
||||
|
||||
# Only preserve masked values if it's the same provider
|
||||
if existing_provider == request.provider:
|
||||
if request.provider == "twilio":
|
||||
# Check if account_sid is unchanged (masked value matches)
|
||||
if hasattr(request, 'account_sid') and is_mask_of(request.account_sid, existing_config.value.get("account_sid", "")):
|
||||
config_value["account_sid"] = existing_config.value["account_sid"] # Keep original
|
||||
|
||||
# Check if auth_token is unchanged (masked value matches)
|
||||
if hasattr(request, 'auth_token') and is_mask_of(request.auth_token, existing_config.value.get("auth_token", "")):
|
||||
config_value["auth_token"] = existing_config.value["auth_token"] # Keep original
|
||||
|
||||
elif request.provider == "vonage":
|
||||
# Check if private_key is unchanged (masked value matches)
|
||||
if hasattr(request, 'private_key') and is_mask_of(request.private_key, existing_config.value.get("private_key", "")):
|
||||
config_value["private_key"] = existing_config.value["private_key"] # Keep original
|
||||
|
||||
# Check if api_key is unchanged (masked value matches)
|
||||
if hasattr(request, 'api_key') and request.api_key and is_mask_of(request.api_key, existing_config.value.get("api_key", "")):
|
||||
config_value["api_key"] = existing_config.value["api_key"] # Keep original
|
||||
|
||||
# Check if api_secret is unchanged (masked value matches)
|
||||
if hasattr(request, 'api_secret') and request.api_secret and is_mask_of(request.api_secret, existing_config.value.get("api_secret", "")):
|
||||
config_value["api_secret"] = existing_config.value["api_secret"] # Keep original
|
||||
|
||||
# Always save to new TELEPHONY_CONFIGURATION key
|
||||
await db_client.upsert_configuration(
|
||||
user.selected_organization_id,
|
||||
OrganizationConfigurationKey.TWILIO_CONFIGURATION.value,
|
||||
OrganizationConfigurationKey.TELEPHONY_CONFIGURATION.value,
|
||||
config_value,
|
||||
)
|
||||
|
||||
# If old TWILIO_CONFIGURATION exists, delete it to avoid confusion
|
||||
if existing_config and existing_config.key == OrganizationConfigurationKey.TWILIO_CONFIGURATION.value:
|
||||
# Note: We're migrating from old to new key
|
||||
logger.info(f"Migrated telephony config from TWILIO_CONFIGURATION to TELEPHONY_CONFIGURATION for org {user.selected_organization_id}")
|
||||
|
||||
return {"message": "Telephony configuration saved successfully"}
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ from api.enums import WorkflowRunMode
|
|||
from api.services.auth.depends import get_user
|
||||
from api.services.campaign.call_dispatcher import campaign_call_dispatcher
|
||||
from api.services.campaign.campaign_event_publisher import get_campaign_event_publisher
|
||||
from api.services.pipecat.run_pipeline import run_pipeline_twilio
|
||||
from api.services.pipecat.run_pipeline import run_pipeline_twilio, run_pipeline_vonage
|
||||
from api.services.telephony.factory import get_telephony_provider
|
||||
from api.utils.tunnel import TunnelURLProvider
|
||||
from pipecat.utils.context import set_current_run_id
|
||||
|
|
@ -28,6 +28,7 @@ router = APIRouter(prefix="/telephony")
|
|||
class InitiateCallRequest(BaseModel):
|
||||
workflow_id: int
|
||||
workflow_run_id: int | None = None
|
||||
phone_number: str | None = None # Optional phone number to call
|
||||
|
||||
|
||||
class StatusCallbackRequest(BaseModel):
|
||||
|
|
@ -55,6 +56,31 @@ class StatusCallbackRequest(BaseModel):
|
|||
duration=data.get("CallDuration") or data.get("Duration"),
|
||||
extra=data
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_vonage(cls, data: dict):
|
||||
"""Convert Vonage event to generic format"""
|
||||
# Map Vonage status to common format
|
||||
status_map = {
|
||||
"started": "initiated",
|
||||
"ringing": "ringing",
|
||||
"answered": "answered",
|
||||
"complete": "completed",
|
||||
"failed": "failed",
|
||||
"busy": "busy",
|
||||
"timeout": "no-answer",
|
||||
"rejected": "busy"
|
||||
}
|
||||
|
||||
return cls(
|
||||
call_id=data.get("uuid", ""),
|
||||
status=status_map.get(data.get("status", ""), data.get("status", "")),
|
||||
from_number=data.get("from"),
|
||||
to_number=data.get("to"),
|
||||
direction=data.get("direction"),
|
||||
duration=data.get("duration"),
|
||||
extra=data
|
||||
)
|
||||
|
||||
|
||||
@router.post("/initiate-call")
|
||||
|
|
@ -73,8 +99,29 @@ async def initiate_call(
|
|||
detail="telephony_not_configured",
|
||||
)
|
||||
|
||||
# Determine the workflow run mode based on provider type
|
||||
from api.services.telephony.providers.twilio_provider import TwilioProvider
|
||||
from api.services.telephony.providers.vonage_provider import VonageProvider
|
||||
|
||||
if isinstance(provider, TwilioProvider):
|
||||
workflow_run_mode = WorkflowRunMode.TWILIO.value
|
||||
elif isinstance(provider, VonageProvider):
|
||||
workflow_run_mode = WorkflowRunMode.VONAGE.value
|
||||
else:
|
||||
# Default to TWILIO for backward compatibility
|
||||
workflow_run_mode = WorkflowRunMode.TWILIO.value
|
||||
|
||||
user_configuration = await db_client.get_user_configurations(user.id)
|
||||
|
||||
# Use phone number from request, or fall back to user configuration
|
||||
phone_number = request.phone_number or user_configuration.test_phone_number
|
||||
|
||||
if not phone_number:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="Phone number must be provided in request or set in user configuration"
|
||||
)
|
||||
|
||||
workflow_run_id = request.workflow_run_id
|
||||
|
||||
if not workflow_run_id:
|
||||
|
|
@ -82,9 +129,9 @@ async def initiate_call(
|
|||
workflow_run = await db_client.create_workflow_run(
|
||||
workflow_run_name,
|
||||
request.workflow_id,
|
||||
WorkflowRunMode.TWILIO.value, # TODO: Make this provider-agnostic
|
||||
workflow_run_mode, # Now provider-agnostic
|
||||
initial_context={
|
||||
"phone_number": user_configuration.test_phone_number,
|
||||
"phone_number": phone_number,
|
||||
},
|
||||
user_id=user.id,
|
||||
)
|
||||
|
|
@ -95,13 +142,15 @@ async def initiate_call(
|
|||
raise HTTPException(status_code=400, detail="Workflow run not found")
|
||||
workflow_run_name = workflow_run.name
|
||||
|
||||
if not user_configuration.test_phone_number:
|
||||
raise HTTPException(status_code=400, detail="Test phone number not set")
|
||||
|
||||
# Construct webhook URL
|
||||
# Construct webhook URL based on provider type
|
||||
backend_endpoint = await TunnelURLProvider.get_tunnel_url()
|
||||
|
||||
# Check provider type to determine webhook endpoint
|
||||
provider_type = getattr(provider, '__class__', None).__name__ if provider else None
|
||||
webhook_endpoint = "ncco" if provider_type == "VonageProvider" else "twiml"
|
||||
|
||||
webhook_url = (
|
||||
f"https://{backend_endpoint}/api/v1/telephony/twiml"
|
||||
f"https://{backend_endpoint}/api/v1/telephony/{webhook_endpoint}"
|
||||
f"?workflow_id={request.workflow_id}"
|
||||
f"&user_id={user.id}"
|
||||
f"&workflow_run_id={workflow_run_id}"
|
||||
|
|
@ -109,12 +158,19 @@ async def initiate_call(
|
|||
)
|
||||
|
||||
# Initiate call via provider
|
||||
await provider.initiate_call(
|
||||
to_number=user_configuration.test_phone_number,
|
||||
result = await provider.initiate_call(
|
||||
to_number=phone_number,
|
||||
webhook_url=webhook_url,
|
||||
workflow_run_id=workflow_run_id,
|
||||
)
|
||||
|
||||
# Store call UUID for Vonage in workflow run context
|
||||
if provider_type == "VonageProvider" and result and "uuid" in result:
|
||||
await db_client.update_workflow_run(
|
||||
run_id=workflow_run_id,
|
||||
gathered_context={"call_uuid": result["uuid"]}
|
||||
)
|
||||
|
||||
return {
|
||||
"message": f"Call initiated successfully with run name {workflow_run_name}"
|
||||
}
|
||||
|
|
@ -143,47 +199,130 @@ async def handle_twiml_webhook(
|
|||
return HTMLResponse(content=response_content, media_type="application/xml")
|
||||
|
||||
|
||||
@router.get("/ncco", include_in_schema=False)
|
||||
async def handle_ncco_webhook(
|
||||
workflow_id: int,
|
||||
user_id: int,
|
||||
workflow_run_id: int,
|
||||
organization_id: Optional[int] = None
|
||||
):
|
||||
"""Handle NCCO (Nexmo Call Control Objects) webhook for Vonage.
|
||||
|
||||
Returns JSON response instead of XML like TwiML.
|
||||
"""
|
||||
# Get provider for organization
|
||||
provider = await get_telephony_provider(organization_id or user_id)
|
||||
|
||||
# Generate NCCO response (JSON for Vonage)
|
||||
response_content = await provider.get_webhook_response(
|
||||
workflow_id, user_id, workflow_run_id
|
||||
)
|
||||
|
||||
# Return JSON response for Vonage
|
||||
return json.loads(response_content)
|
||||
|
||||
|
||||
@router.websocket("/ws/{workflow_id}/{user_id}/{workflow_run_id}")
|
||||
async def websocket_endpoint(
|
||||
websocket: WebSocket, workflow_id: int, user_id: int, workflow_run_id: int
|
||||
):
|
||||
"""WebSocket endpoint for real-time call handling - matches original Twilio implementation."""
|
||||
"""WebSocket endpoint for real-time call handling - supports both Twilio and Vonage."""
|
||||
await websocket.accept()
|
||||
|
||||
try:
|
||||
# "connected" (ignore)
|
||||
msg = json.loads(await websocket.receive_text())
|
||||
if msg.get("event") != "connected":
|
||||
raise RuntimeError("Expected connected message first")
|
||||
|
||||
# "start" – this has everything we need
|
||||
start_msg = await websocket.receive_text()
|
||||
|
||||
# set the run context
|
||||
set_current_run_id(workflow_run_id)
|
||||
|
||||
# Peek at the first message to determine provider
|
||||
# Twilio sends JSON with "connected" event
|
||||
# Vonage sends binary audio directly or may send metadata
|
||||
first_msg = await websocket.receive()
|
||||
|
||||
if "text" in first_msg:
|
||||
# Text message - likely Twilio
|
||||
msg = json.loads(first_msg["text"])
|
||||
if msg.get("event") == "connected":
|
||||
# Definitely Twilio - follow Twilio flow
|
||||
|
||||
# "start" – this has everything we need
|
||||
start_msg = await websocket.receive_text()
|
||||
logger.debug(f"Received start message: {start_msg}")
|
||||
|
||||
logger.debug(f"Received start message: {start_msg}")
|
||||
start_msg = json.loads(start_msg)
|
||||
if start_msg.get("event") != "start":
|
||||
raise RuntimeError("Expected start message second")
|
||||
|
||||
start_msg = json.loads(start_msg)
|
||||
if start_msg.get("event") != "start":
|
||||
raise RuntimeError("Expected start message second")
|
||||
try:
|
||||
stream_sid = start_msg["start"]["streamSid"]
|
||||
call_sid = start_msg["start"]["callSid"]
|
||||
except KeyError:
|
||||
logger.error(
|
||||
"Missing callSID and streamSID in start message. Closing connection."
|
||||
)
|
||||
await websocket.close(code=4400, reason="Missing or bad start message")
|
||||
return
|
||||
|
||||
try:
|
||||
stream_sid = start_msg["start"]["streamSid"]
|
||||
call_sid = start_msg["start"]["callSid"]
|
||||
except KeyError:
|
||||
logger.error(
|
||||
"Missing callSID and streamSID in start message. Closing connection."
|
||||
# Run Twilio pipeline
|
||||
await run_pipeline_twilio(
|
||||
websocket, stream_sid, call_sid, workflow_id, workflow_run_id, user_id
|
||||
)
|
||||
elif msg.get("event") == "websocket:connected":
|
||||
# This is Vonage's initial connection message
|
||||
logger.info(f"Vonage WebSocket connected for workflow_run {workflow_run_id}")
|
||||
|
||||
# Get workflow run to extract call UUID
|
||||
workflow_run = await db_client.get_workflow_run(workflow_run_id)
|
||||
workflow = await db_client.get_workflow(workflow_id)
|
||||
|
||||
# Extract call UUID from workflow run context
|
||||
call_uuid = workflow_run.gathered_context.get("call_uuid") if workflow_run.gathered_context else None
|
||||
|
||||
if not call_uuid:
|
||||
logger.error("No call UUID found for Vonage connection")
|
||||
await websocket.close(code=4400, reason="Missing call UUID")
|
||||
return
|
||||
|
||||
# Run Vonage pipeline
|
||||
await run_pipeline_vonage(
|
||||
websocket,
|
||||
call_uuid,
|
||||
workflow,
|
||||
workflow.organization_id,
|
||||
workflow_id,
|
||||
workflow_run_id,
|
||||
user_id
|
||||
)
|
||||
else:
|
||||
# Unknown provider or format
|
||||
logger.warning(f"Unknown first message format: {msg}")
|
||||
|
||||
elif "bytes" in first_msg:
|
||||
# Binary message - likely Vonage audio
|
||||
# For Vonage, we need to get the call UUID from the workflow run
|
||||
workflow_run = await db_client.get_workflow_run(workflow_run_id)
|
||||
workflow = await db_client.get_workflow(workflow_id)
|
||||
|
||||
# Extract call UUID from workflow run context
|
||||
call_uuid = workflow_run.gathered_context.get("call_uuid") if workflow_run.gathered_context else None
|
||||
|
||||
if not call_uuid:
|
||||
logger.error("No call UUID found for Vonage connection")
|
||||
await websocket.close(code=4400, reason="Missing call UUID")
|
||||
return
|
||||
|
||||
# Run Vonage pipeline
|
||||
await run_pipeline_vonage(
|
||||
websocket,
|
||||
call_uuid,
|
||||
workflow,
|
||||
workflow.organization_id, # Use the actual organization_id from workflow
|
||||
workflow_id,
|
||||
workflow_run_id,
|
||||
user_id
|
||||
)
|
||||
await websocket.close(code=4400, reason="Missing or bad start message")
|
||||
return
|
||||
|
||||
# Run your Pipecat bot
|
||||
await run_pipeline_twilio(
|
||||
websocket, stream_sid, call_sid, workflow_id, workflow_run_id, user_id
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in Twilio WebSocket connection: {e}")
|
||||
logger.error(f"Error in WebSocket connection: {e}")
|
||||
await websocket.close(1011, "Internal server error")
|
||||
|
||||
|
||||
|
|
@ -225,7 +364,7 @@ async def handle_status_callback(
|
|||
)
|
||||
|
||||
if not is_valid:
|
||||
logger.warning(f"Invalid status callback signature for run {workflow_run_id}")
|
||||
logger.warning(f"Invalid webhook signature for workflow run {workflow_run_id}")
|
||||
return {"status": "error", "reason": "invalid_signature"}
|
||||
|
||||
# Convert provider-specific callback to generic format
|
||||
|
|
@ -312,4 +451,57 @@ async def _process_status_update(
|
|||
run_id=workflow_run_id,
|
||||
is_completed=True,
|
||||
gathered_context={"call_tags": call_tags}
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@router.post("/events/{workflow_run_id}")
|
||||
async def handle_vonage_events(
|
||||
request: Request,
|
||||
workflow_run_id: int,
|
||||
):
|
||||
"""Handle Vonage event webhooks.
|
||||
|
||||
Vonage sends all call events to a single endpoint.
|
||||
Events include: started, ringing, answered, complete, failed, etc.
|
||||
"""
|
||||
# Parse the event data
|
||||
event_data = await request.json()
|
||||
logger.info(f"[run {workflow_run_id}] Received Vonage event: {event_data}")
|
||||
|
||||
# Get workflow run for processing
|
||||
workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
|
||||
if not workflow_run:
|
||||
logger.error(f"[run {workflow_run_id}] Workflow run not found")
|
||||
return {"status": "error", "message": "Workflow run not found"}
|
||||
|
||||
# If this is a completed call and includes cost info, capture it immediately
|
||||
if event_data.get("status") == "completed":
|
||||
# Vonage sometimes includes price info in the webhook
|
||||
if "price" in event_data or "rate" in event_data:
|
||||
try:
|
||||
if workflow_run.cost_info:
|
||||
# Store immediate cost info if available
|
||||
cost_info = workflow_run.cost_info.copy()
|
||||
if "price" in event_data:
|
||||
cost_info["vonage_webhook_price"] = float(event_data["price"])
|
||||
if "rate" in event_data:
|
||||
cost_info["vonage_webhook_rate"] = float(event_data["rate"])
|
||||
if "duration" in event_data:
|
||||
cost_info["vonage_webhook_duration"] = int(event_data["duration"])
|
||||
|
||||
await db_client.update_workflow_run(
|
||||
run_id=workflow_run_id,
|
||||
cost_info=cost_info
|
||||
)
|
||||
logger.info(f"[run {workflow_run_id}] Captured Vonage cost info from webhook")
|
||||
except Exception as e:
|
||||
logger.error(f"[run {workflow_run_id}] Failed to capture Vonage cost from webhook: {e}")
|
||||
|
||||
# Convert to generic status format
|
||||
status_update = StatusCallbackRequest.from_vonage(event_data)
|
||||
|
||||
# Process the status update
|
||||
await _process_status_update(workflow_run_id, status_update, workflow_run)
|
||||
|
||||
# Return 204 No Content as expected by Vonage
|
||||
return {"status": "ok"}
|
||||
|
|
@ -20,7 +20,8 @@ from api.services.campaign.campaign_event_publisher import (
|
|||
get_campaign_event_publisher,
|
||||
)
|
||||
from api.services.pipecat.run_pipeline import run_pipeline_twilio
|
||||
from api.services.telephony.twilio import TwilioService
|
||||
from api.services.telephony.factory import get_telephony_provider
|
||||
from api.utils.tunnel import TunnelURLProvider
|
||||
from pipecat.utils.context import set_current_run_id
|
||||
|
||||
router = APIRouter(prefix="/twilio")
|
||||
|
|
@ -48,10 +49,10 @@ class TwilioStatusCallbackRequest(BaseModel):
|
|||
async def initiate_call(
|
||||
request: InitiateCallRequest, user: UserModel = Depends(get_user)
|
||||
):
|
||||
# Check if organization has TWILIO_CONFIGURATION configured
|
||||
# Check if organization has TELEPHONY_CONFIGURATION configured
|
||||
twilio_config = await db_client.get_configuration(
|
||||
user.selected_organization_id,
|
||||
OrganizationConfigurationKey.TWILIO_CONFIGURATION.value,
|
||||
OrganizationConfigurationKey.TELEPHONY_CONFIGURATION.value,
|
||||
)
|
||||
|
||||
if not twilio_config or not twilio_config.value:
|
||||
|
|
@ -83,15 +84,16 @@ async def initiate_call(
|
|||
workflow_run_name = workflow_run.name
|
||||
|
||||
if user_configuration.test_phone_number:
|
||||
twilio_service = TwilioService(user.selected_organization_id)
|
||||
await twilio_service.initiate_call(
|
||||
# Use new provider pattern instead of legacy TwilioService
|
||||
provider = await get_telephony_provider(user.selected_organization_id)
|
||||
|
||||
# Generate webhook URL for Twilio
|
||||
backend_endpoint = await TunnelURLProvider.get_tunnel_url()
|
||||
webhook_url = f"https://{backend_endpoint}/api/v1/twilio/twiml?workflow_id={request.workflow_id}&user_id={user.id}&workflow_run_id={workflow_run_id}&organization_id={user.selected_organization_id}"
|
||||
|
||||
await provider.initiate_call(
|
||||
to_number=user_configuration.test_phone_number,
|
||||
url_args={
|
||||
"workflow_id": request.workflow_id,
|
||||
"user_id": user.id,
|
||||
"workflow_run_id": workflow_run_id,
|
||||
"organization_id": user.selected_organization_id,
|
||||
},
|
||||
webhook_url=webhook_url,
|
||||
workflow_run_id=workflow_run_id,
|
||||
)
|
||||
return {
|
||||
|
|
@ -105,7 +107,9 @@ async def initiate_call(
|
|||
async def start_call(
|
||||
workflow_id: int, user_id: int, workflow_run_id: int, organization_id: int
|
||||
):
|
||||
twiml_content = await TwilioService(organization_id).get_start_call_twiml(
|
||||
# Use new provider pattern for TwiML generation
|
||||
provider = await get_telephony_provider(organization_id)
|
||||
twiml_content = await provider.get_webhook_response(
|
||||
workflow_id, user_id, workflow_run_id
|
||||
)
|
||||
return HTMLResponse(content=twiml_content, media_type="application/xml")
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
from typing import List
|
||||
from typing import List, Optional
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
|
@ -24,7 +24,32 @@ class TwilioConfigurationResponse(BaseModel):
|
|||
from_numbers: List[str]
|
||||
|
||||
|
||||
class VonageConfigurationRequest(BaseModel):
|
||||
"""Request schema for Vonage configuration."""
|
||||
|
||||
provider: str = Field(default="vonage")
|
||||
api_key: Optional[str] = Field(None, description="Vonage API Key")
|
||||
api_secret: Optional[str] = Field(None, description="Vonage API Secret")
|
||||
application_id: str = Field(..., description="Vonage Application ID")
|
||||
private_key: str = Field(..., description="Private key for JWT generation")
|
||||
from_numbers: List[str] = Field(
|
||||
..., min_length=1, description="List of Vonage phone numbers (without + prefix)"
|
||||
)
|
||||
|
||||
|
||||
class VonageConfigurationResponse(BaseModel):
|
||||
"""Response schema for Vonage configuration with masked sensitive fields."""
|
||||
|
||||
provider: str
|
||||
application_id: str # Not sensitive, can show full
|
||||
api_key: Optional[str] # Masked if present
|
||||
api_secret: Optional[str] # Masked if present
|
||||
private_key: str # Masked (shows only if configured)
|
||||
from_numbers: List[str]
|
||||
|
||||
|
||||
class TelephonyConfigurationResponse(BaseModel):
|
||||
"""Top-level telephony configuration response."""
|
||||
|
||||
twilio: TwilioConfigurationResponse | None = None
|
||||
twilio: Optional[TwilioConfigurationResponse] = None
|
||||
vonage: Optional[VonageConfigurationResponse] = None
|
||||
|
|
|
|||
|
|
@ -80,7 +80,7 @@ def create_audio_config(transport_type: str) -> AudioConfig:
|
|||
"""Create audio configuration based on transport type.
|
||||
|
||||
Args:
|
||||
transport_type: Type of transport ("webrtc", "twilio", "stasis")
|
||||
transport_type: Type of transport ("webrtc", "twilio", "vonage", "stasis")
|
||||
|
||||
Returns:
|
||||
AudioConfig instance with appropriate settings
|
||||
|
|
@ -93,6 +93,15 @@ def create_audio_config(transport_type: str) -> AudioConfig:
|
|||
pipeline_sample_rate=8000, # Keep at 8kHz to avoid resampling
|
||||
buffer_size_seconds=1.0,
|
||||
)
|
||||
elif transport_type == WorkflowRunMode.VONAGE.value:
|
||||
# Vonage uses 16kHz Linear PCM
|
||||
return AudioConfig(
|
||||
transport_in_sample_rate=16000,
|
||||
transport_out_sample_rate=16000,
|
||||
vad_sample_rate=16000, # Use matching VAD rate
|
||||
pipeline_sample_rate=16000, # Keep at 16kHz to avoid resampling
|
||||
buffer_size_seconds=1.0,
|
||||
)
|
||||
elif transport_type in [
|
||||
WorkflowRunMode.WEBRTC.value,
|
||||
WorkflowRunMode.SMALLWEBRTC.value,
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ from fastapi import HTTPException, WebSocket
|
|||
from loguru import logger
|
||||
|
||||
from api.db import db_client
|
||||
from api.db.models import WorkflowModel
|
||||
from api.enums import WorkflowRunMode
|
||||
from api.services.pipecat.audio_config import AudioConfig, create_audio_config
|
||||
from api.services.pipecat.engine_pre_aggregator_processor import (
|
||||
|
|
@ -33,6 +34,7 @@ from api.services.pipecat.tracing_config import setup_pipeline_tracing
|
|||
from api.services.pipecat.transport_setup import (
|
||||
create_stasis_transport,
|
||||
create_twilio_transport,
|
||||
create_vonage_transport,
|
||||
create_webrtc_transport,
|
||||
)
|
||||
from api.services.telephony.stasis_rtp_connection import StasisRTPConnection
|
||||
|
|
@ -70,7 +72,7 @@ async def run_pipeline_twilio(
|
|||
set_current_run_id(workflow_run_id)
|
||||
|
||||
# Store Twilio call SID in cost_info for later cost calculation
|
||||
cost_info = {"twilio_call_sid": call_sid}
|
||||
cost_info = {"twilio_call_sid": call_sid, "provider": "twilio"}
|
||||
await db_client.update_workflow_run(workflow_run_id, cost_info=cost_info)
|
||||
|
||||
# Get workflow to extract all pipeline configurations
|
||||
|
|
@ -107,6 +109,69 @@ async def run_pipeline_twilio(
|
|||
)
|
||||
|
||||
|
||||
async def run_pipeline_vonage(
|
||||
websocket_client,
|
||||
call_uuid: str,
|
||||
workflow: WorkflowModel,
|
||||
organization_id: int,
|
||||
workflow_id: int,
|
||||
workflow_run_id: int,
|
||||
user_id: int,
|
||||
):
|
||||
"""Run pipeline for Vonage WebSocket connections.
|
||||
|
||||
Vonage uses raw PCM audio over WebSocket instead of base64-encoded μ-law.
|
||||
The audio is transmitted as binary frames at 16kHz by default.
|
||||
"""
|
||||
logger.info(f"Starting Vonage pipeline for workflow run {workflow_run_id}")
|
||||
set_current_run_id(workflow_run_id)
|
||||
|
||||
# Store Vonage call UUID in cost_info for later cost calculation
|
||||
cost_info = {"vonage_call_uuid": call_uuid, "provider": "vonage"}
|
||||
await db_client.update_workflow_run(workflow_run_id, cost_info=cost_info)
|
||||
|
||||
# Extract VAD and ambient noise config from workflow
|
||||
vad_config = None
|
||||
ambient_noise_config = None
|
||||
if workflow and workflow.workflow_configurations:
|
||||
if "vad_configuration" in workflow.workflow_configurations:
|
||||
vad_config = workflow.workflow_configurations["vad_configuration"]
|
||||
if "ambient_noise_configuration" in workflow.workflow_configurations:
|
||||
ambient_noise_config = workflow.workflow_configurations["ambient_noise_configuration"]
|
||||
|
||||
try:
|
||||
# Setup audio config for Vonage using the centralized config
|
||||
audio_config = create_audio_config(WorkflowRunMode.VONAGE.value)
|
||||
|
||||
# Create Vonage transport
|
||||
transport = await create_vonage_transport(
|
||||
websocket_client,
|
||||
call_uuid,
|
||||
workflow_run_id,
|
||||
audio_config,
|
||||
organization_id,
|
||||
vad_config,
|
||||
ambient_noise_config,
|
||||
)
|
||||
|
||||
# No special handshake needed for Vonage
|
||||
# Audio streaming starts immediately
|
||||
|
||||
# Run the pipeline (same as Twilio/WebRTC)
|
||||
await _run_pipeline(
|
||||
transport,
|
||||
workflow_id,
|
||||
workflow_run_id,
|
||||
user_id,
|
||||
call_context_vars={},
|
||||
audio_config=audio_config,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in Vonage pipeline: {e}")
|
||||
raise
|
||||
|
||||
|
||||
async def run_pipeline_smallwebrtc(
|
||||
webrtc_connection: SmallWebRTCConnection,
|
||||
workflow_id: int,
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ from pipecat.audio.mixers.soundfile_mixer import SoundfileMixer
|
|||
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer, VADParams
|
||||
from pipecat.serializers.twilio import TwilioFrameSerializer
|
||||
from pipecat.serializers.vonage import VonageFrameSerializer
|
||||
from pipecat.transports.base_transport import TransportParams
|
||||
from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection
|
||||
from pipecat.transports.smallwebrtc.transport import SmallWebRTCTransport
|
||||
|
|
@ -85,7 +86,7 @@ async def create_twilio_transport(
|
|||
|
||||
# Fetch Twilio credentials from organization config
|
||||
config = await db_client.get_configuration(
|
||||
organization_id, OrganizationConfigurationKey.TWILIO_CONFIGURATION.value
|
||||
organization_id, OrganizationConfigurationKey.TELEPHONY_CONFIGURATION.value
|
||||
)
|
||||
|
||||
if not config or not config.value:
|
||||
|
|
@ -151,6 +152,86 @@ async def create_twilio_transport(
|
|||
)
|
||||
|
||||
|
||||
async def create_vonage_transport(
|
||||
websocket_client,
|
||||
call_uuid: str,
|
||||
workflow_run_id: int,
|
||||
audio_config: AudioConfig,
|
||||
organization_id: int,
|
||||
vad_config: dict | None = None,
|
||||
ambient_noise_config: dict | None = None,
|
||||
):
|
||||
"""Create a transport for Vonage connections"""
|
||||
|
||||
# Use the factory to load config from database
|
||||
from api.services.telephony.factory import load_telephony_config
|
||||
config = await load_telephony_config(organization_id)
|
||||
|
||||
if config.get("provider") != "vonage":
|
||||
raise ValueError(f"Expected Vonage provider, got {config.get('provider')}")
|
||||
|
||||
application_id = config.get("application_id")
|
||||
private_key = config.get("private_key")
|
||||
|
||||
if not application_id or not private_key:
|
||||
raise ValueError(
|
||||
f"Incomplete Vonage configuration for organization {organization_id}"
|
||||
)
|
||||
|
||||
turn_analyzer = create_turn_analyzer(workflow_run_id, audio_config)
|
||||
|
||||
serializer = VonageFrameSerializer(
|
||||
call_uuid=call_uuid,
|
||||
application_id=application_id,
|
||||
private_key=private_key,
|
||||
params=VonageFrameSerializer.InputParams(
|
||||
vonage_sample_rate=audio_config.transport_in_sample_rate,
|
||||
sample_rate=audio_config.pipeline_sample_rate
|
||||
)
|
||||
)
|
||||
|
||||
# Important: Vonage uses binary WebSocket mode, not text
|
||||
return FastAPIWebsocketTransport(
|
||||
websocket=websocket_client,
|
||||
params=FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
audio_in_sample_rate=audio_config.transport_in_sample_rate,
|
||||
audio_out_sample_rate=audio_config.transport_out_sample_rate,
|
||||
vad_analyzer=(
|
||||
SileroVADAnalyzer(
|
||||
params=VADParams(
|
||||
confidence=vad_config.get("confidence", 0.7),
|
||||
start_secs=vad_config.get("start_seconds", 0.4),
|
||||
stop_secs=vad_config.get("stop_seconds", 0.8),
|
||||
min_volume=vad_config.get("minimum_volume", 0.6),
|
||||
)
|
||||
)
|
||||
if vad_config
|
||||
else SileroVADAnalyzer()
|
||||
),
|
||||
audio_out_mixer=(
|
||||
SoundfileMixer(
|
||||
sound_files={
|
||||
"office": APP_ROOT_DIR
|
||||
/ "assets"
|
||||
/ f"office-ambience-{audio_config.transport_out_sample_rate}-mono.wav"
|
||||
},
|
||||
default_sound="office",
|
||||
volume=ambient_noise_config.get("volume", 0.3),
|
||||
)
|
||||
if ambient_noise_config and ambient_noise_config.get("enabled", False)
|
||||
else SilenceAudioMixer()
|
||||
),
|
||||
turn_analyzer=turn_analyzer,
|
||||
serializer=serializer,
|
||||
audio_in_filter=RNNoiseFilter(library_path=librnnoise_path)
|
||||
if ENABLE_RNNOISE
|
||||
else None,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def create_webrtc_transport(
|
||||
webrtc_connection: SmallWebRTCConnection,
|
||||
workflow_run_id: int,
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ Business Logic → TelephonyProvider (Interface) → Concrete Provider (Twilio,
|
|||
```python
|
||||
from api.services.telephony.factory import get_telephony_provider
|
||||
|
||||
# Get provider based on environment/config
|
||||
# Get provider based on organization config
|
||||
provider = await get_telephony_provider(organization_id)
|
||||
|
||||
# Initiate a call
|
||||
|
|
@ -35,8 +35,9 @@ telephony/
|
|||
├── factory.py # Provider creation and config loading
|
||||
├── providers/
|
||||
│ ├── __init__.py
|
||||
│ └── twilio_provider.py # Twilio implementation
|
||||
├── twilio.py # Legacy TwilioService (backward compat)
|
||||
│ ├── twilio_provider.py # Twilio implementation
|
||||
│ └── vonage_provider.py # Vonage implementation
|
||||
├── twilio.py # Legacy (removed, use factory instead)
|
||||
└── README.md # This file
|
||||
```
|
||||
|
||||
|
|
@ -47,9 +48,8 @@ See the [Custom Provider Guide](https://docs.dograh.com/integrations/telephony/c
|
|||
Quick checklist:
|
||||
1. Create `providers/your_provider.py` implementing `TelephonyProvider`
|
||||
2. Update `factory.py` to include your provider
|
||||
3. Add environment variable support in `factory.py`
|
||||
4. Write unit tests
|
||||
5. Update documentation
|
||||
3. Write unit tests
|
||||
4. Update documentation
|
||||
|
||||
## Key Interfaces
|
||||
|
||||
|
|
@ -76,21 +76,20 @@ class TelephonyProvider(ABC):
|
|||
|
||||
## Configuration Loading
|
||||
|
||||
The `factory.py` handles configuration from two sources:
|
||||
The `factory.py` loads configuration from the database:
|
||||
|
||||
1. **OSS Mode** (default): Environment variables
|
||||
```python
|
||||
TELEPHONY_PROVIDER=twilio
|
||||
TWILIO_ACCOUNT_SID=xxx
|
||||
TWILIO_AUTH_TOKEN=xxx
|
||||
TWILIO_FROM_NUMBER=+1234567890
|
||||
```
|
||||
|
||||
2. **SaaS Mode**: Database configuration per organization
|
||||
**Both Saas and OSS Modes**: Database configuration via UI
|
||||
```python
|
||||
# Loaded from organization_configuration table
|
||||
key: "TWILIO_CONFIGURATION"
|
||||
value: {"account_sid": "xxx", "auth_token": "xxx", "from_numbers": [...]}
|
||||
key: "TELEPHONY_CONFIGURATION"
|
||||
value: {
|
||||
"provider": "twilio", # or "vonage"
|
||||
"account_sid": "xxx", # for Twilio
|
||||
"auth_token": "xxx", # for Twilio
|
||||
"application_id": "xxx", # for Vonage
|
||||
"private_key": "xxx", # for Vonage
|
||||
"from_numbers": [...]
|
||||
}
|
||||
```
|
||||
|
||||
## Testing
|
||||
|
|
@ -117,14 +116,15 @@ async def test_call_initiation(mock_get_provider):
|
|||
### Integration Testing
|
||||
|
||||
Run against actual providers in development:
|
||||
```bash
|
||||
# Set test credentials
|
||||
export TELEPHONY_PROVIDER=twilio
|
||||
export TWILIO_ACCOUNT_SID=test_sid
|
||||
export TWILIO_AUTH_TOKEN=test_token
|
||||
export TWILIO_FROM_NUMBER=+15005550006 # Twilio test number
|
||||
|
||||
# Run integration tests
|
||||
1. Configure your provider through the UI:
|
||||
- Navigate to Settings → Integrations → Telephony
|
||||
- Select your provider (Twilio or Vonage)
|
||||
- Enter test credentials
|
||||
- Save configuration
|
||||
|
||||
2. Run integration tests:
|
||||
```bash
|
||||
pytest tests/integration/test_telephony.py
|
||||
```
|
||||
|
||||
|
|
@ -155,7 +155,7 @@ await provider.initiate_call(...)
|
|||
## Common Issues
|
||||
|
||||
1. **Import Error**: Always import from `factory`, not directly from providers
|
||||
2. **Config Not Found**: Check environment variables or database configuration
|
||||
2. **Config Not Found**: Check database configuration via UI
|
||||
3. **Signature Verification**: Ensure auth tokens match between provider and config
|
||||
4. **WebSocket Issues**: Verify audio format compatibility (MULAW for Twilio)
|
||||
|
||||
|
|
|
|||
|
|
@ -100,4 +100,21 @@ class TelephonyProvider(ABC):
|
|||
Returns:
|
||||
Provider-specific response (e.g., TwiML for Twilio)
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def get_call_cost(self, call_id: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Get cost information for a completed call.
|
||||
|
||||
Args:
|
||||
call_id: Provider-specific call identifier (SID for Twilio, UUID for Vonage)
|
||||
|
||||
Returns:
|
||||
Dict containing:
|
||||
- cost_usd: The cost in USD as float
|
||||
- duration: Call duration in seconds
|
||||
- status: Call completion status
|
||||
- raw_response: Full provider response for debugging
|
||||
"""
|
||||
pass
|
||||
|
|
@ -12,86 +12,74 @@ from api.db import db_client
|
|||
from api.enums import OrganizationConfigurationKey
|
||||
from api.services.telephony.base import TelephonyProvider
|
||||
from api.services.telephony.providers.twilio_provider import TwilioProvider
|
||||
from api.services.telephony.providers.vonage_provider import VonageProvider
|
||||
|
||||
|
||||
async def load_telephony_config(organization_id: Optional[int] = None) -> Dict[str, Any]:
|
||||
async def load_telephony_config(organization_id: int) -> Dict[str, Any]:
|
||||
"""
|
||||
Load telephony configuration from appropriate source.
|
||||
Load telephony configuration from database.
|
||||
|
||||
Args:
|
||||
organization_id: Organization ID for database config (SaaS mode)
|
||||
None for environment config (OSS mode)
|
||||
organization_id: Organization ID for database config
|
||||
|
||||
Returns:
|
||||
Configuration dictionary with provider type and credentials
|
||||
|
||||
Raises:
|
||||
ValueError: If no configuration found for the organization
|
||||
"""
|
||||
if organization_id:
|
||||
# SaaS mode: Load from database
|
||||
logger.debug(f"Loading telephony config from database for org {organization_id}")
|
||||
|
||||
# TODO: Use TELEPHONY_CONFIGURATION
|
||||
twilio_config = await db_client.get_configuration(
|
||||
if not organization_id:
|
||||
raise ValueError("Organization ID is required to load telephony configuration")
|
||||
|
||||
logger.debug(f"Loading telephony config from database for org {organization_id}")
|
||||
|
||||
# Try new key first
|
||||
config = await db_client.get_configuration(
|
||||
organization_id,
|
||||
OrganizationConfigurationKey.TELEPHONY_CONFIGURATION.value,
|
||||
)
|
||||
|
||||
# Fallback to old key for backward compatibility
|
||||
if not config:
|
||||
config = await db_client.get_configuration(
|
||||
organization_id,
|
||||
OrganizationConfigurationKey.TWILIO_CONFIGURATION.value,
|
||||
)
|
||||
|
||||
if twilio_config and twilio_config.value:
|
||||
# TODO: Get provider from config
|
||||
return {
|
||||
"provider": "twilio",
|
||||
"account_sid": twilio_config.value.get("account_sid"),
|
||||
"auth_token": twilio_config.value.get("auth_token"),
|
||||
"from_numbers": twilio_config.value.get("from_numbers", [])
|
||||
}
|
||||
|
||||
raise ValueError(f"No telephony configuration found for organization {organization_id}")
|
||||
|
||||
else:
|
||||
# OSS mode: Load from environment variables
|
||||
logger.debug("Loading telephony config from environment variables")
|
||||
|
||||
provider = os.getenv("TELEPHONY_PROVIDER", "twilio").lower()
|
||||
if config and config.value:
|
||||
# Simple single-provider format
|
||||
provider = config.value.get("provider", "twilio")
|
||||
|
||||
if provider == "twilio":
|
||||
# Load Twilio config from env
|
||||
account_sid = os.getenv("TWILIO_ACCOUNT_SID")
|
||||
auth_token = os.getenv("TWILIO_AUTH_TOKEN")
|
||||
from_number = os.getenv("TWILIO_FROM_NUMBER")
|
||||
|
||||
if not all([account_sid, auth_token, from_number]):
|
||||
raise ValueError(
|
||||
"Missing Twilio configuration. Please set TWILIO_ACCOUNT_SID, "
|
||||
"TWILIO_AUTH_TOKEN, and TWILIO_FROM_NUMBER environment variables."
|
||||
)
|
||||
|
||||
return {
|
||||
"provider": "twilio",
|
||||
"account_sid": account_sid,
|
||||
"auth_token": auth_token,
|
||||
"from_numbers": [from_number] if isinstance(from_number, str) else from_number
|
||||
"account_sid": config.value.get("account_sid"),
|
||||
"auth_token": config.value.get("auth_token"),
|
||||
"from_numbers": config.value.get("from_numbers", [])
|
||||
}
|
||||
elif provider == "vonage":
|
||||
return {
|
||||
"provider": "vonage",
|
||||
"application_id": config.value.get("application_id"),
|
||||
"private_key": config.value.get("private_key"),
|
||||
"api_key": config.value.get("api_key"),
|
||||
"api_secret": config.value.get("api_secret"),
|
||||
"from_numbers": config.value.get("from_numbers", [])
|
||||
}
|
||||
|
||||
# Future providers can be added here
|
||||
# elif provider == "vonage":
|
||||
# return {
|
||||
# "provider": "vonage",
|
||||
# "api_key": os.getenv("VONAGE_API_KEY"),
|
||||
# "api_secret": os.getenv("VONAGE_API_SECRET"),
|
||||
# "from_numbers": [os.getenv("VONAGE_FROM_NUMBER")]
|
||||
# }
|
||||
|
||||
else:
|
||||
raise ValueError(f"Unknown telephony provider: {provider}")
|
||||
raise ValueError(f"Unknown provider in config: {provider}")
|
||||
|
||||
raise ValueError(f"No telephony configuration found for organization {organization_id}")
|
||||
|
||||
|
||||
async def get_telephony_provider(
|
||||
organization_id: Optional[int] = None
|
||||
organization_id: int
|
||||
) -> TelephonyProvider:
|
||||
"""
|
||||
Factory function to create telephony providers.
|
||||
|
||||
Args:
|
||||
organization_id: Organization ID for SaaS mode (optional)
|
||||
organization_id: Organization ID (required)
|
||||
|
||||
Returns:
|
||||
Configured telephony provider instance
|
||||
|
|
@ -110,9 +98,10 @@ async def get_telephony_provider(
|
|||
if provider_type == "twilio":
|
||||
return TwilioProvider(config)
|
||||
|
||||
elif provider_type == "vonage":
|
||||
return VonageProvider(config)
|
||||
|
||||
# Future providers can be added here
|
||||
# elif provider_type == "vonage":
|
||||
# return VonageProvider(config)
|
||||
# elif provider_type == "plivo":
|
||||
# return PlivoProvider(config)
|
||||
|
||||
|
|
|
|||
|
|
@ -128,6 +128,7 @@ class TwilioProvider(TelephonyProvider):
|
|||
Verify Twilio webhook signature for security.
|
||||
"""
|
||||
if not self.auth_token:
|
||||
logger.error("No auth token available for webhook signature verification")
|
||||
return False
|
||||
|
||||
validator = RequestValidator(self.auth_token)
|
||||
|
|
@ -148,4 +149,56 @@ class TwilioProvider(TelephonyProvider):
|
|||
</Connect>
|
||||
<Pause length="40"/>
|
||||
</Response>"""
|
||||
return twiml_content
|
||||
return twiml_content
|
||||
|
||||
async def get_call_cost(self, call_id: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Get cost information for a completed Twilio call.
|
||||
|
||||
Args:
|
||||
call_id: The Twilio Call SID
|
||||
|
||||
Returns:
|
||||
Dict containing cost information
|
||||
"""
|
||||
endpoint = f"{self.base_url}/Calls/{call_id}.json"
|
||||
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
auth = aiohttp.BasicAuth(self.account_sid, self.auth_token)
|
||||
async with session.get(endpoint, auth=auth) as response:
|
||||
if response.status != 200:
|
||||
error_data = await response.json()
|
||||
logger.error(f"Failed to get Twilio call cost: {error_data}")
|
||||
return {
|
||||
"cost_usd": 0.0,
|
||||
"duration": 0,
|
||||
"status": "error",
|
||||
"error": str(error_data)
|
||||
}
|
||||
|
||||
call_data = await response.json()
|
||||
|
||||
# Twilio returns price as a negative string (e.g., "-0.0085")
|
||||
price_str = call_data.get("price", "0")
|
||||
cost_usd = abs(float(price_str)) if price_str else 0.0
|
||||
|
||||
# Duration is in seconds as a string
|
||||
duration = int(call_data.get("duration", "0"))
|
||||
|
||||
return {
|
||||
"cost_usd": cost_usd,
|
||||
"duration": duration,
|
||||
"status": call_data.get("status", "unknown"),
|
||||
"price_unit": call_data.get("price_unit", "USD"),
|
||||
"raw_response": call_data
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Exception fetching Twilio call cost: {e}")
|
||||
return {
|
||||
"cost_usd": 0.0,
|
||||
"duration": 0,
|
||||
"status": "error",
|
||||
"error": str(e)
|
||||
}
|
||||
274
api/services/telephony/providers/vonage_provider.py
Normal file
274
api/services/telephony/providers/vonage_provider.py
Normal file
|
|
@ -0,0 +1,274 @@
|
|||
"""
|
||||
Vonage (Nexmo) implementation of the TelephonyProvider interface.
|
||||
"""
|
||||
import json
|
||||
import random
|
||||
import time
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
import aiohttp
|
||||
import jwt
|
||||
from loguru import logger
|
||||
|
||||
from api.services.telephony.base import TelephonyProvider
|
||||
from api.utils.tunnel import TunnelURLProvider
|
||||
|
||||
|
||||
class VonageProvider(TelephonyProvider):
|
||||
"""
|
||||
Vonage implementation of TelephonyProvider.
|
||||
Uses JWT authentication and NCCO for call control.
|
||||
"""
|
||||
|
||||
def __init__(self, config: Dict[str, Any]):
|
||||
"""
|
||||
Initialize VonageProvider with configuration.
|
||||
|
||||
Args:
|
||||
config: Dictionary containing:
|
||||
- api_key: Vonage API Key
|
||||
- api_secret: Vonage API Secret
|
||||
- application_id: Vonage Application ID
|
||||
- private_key: Private key for JWT generation
|
||||
- from_numbers: List of phone numbers to use
|
||||
"""
|
||||
self.api_key = config.get("api_key")
|
||||
self.api_secret = config.get("api_secret")
|
||||
self.application_id = config.get("application_id")
|
||||
self.private_key = config.get("private_key")
|
||||
self.from_numbers = config.get("from_numbers", [])
|
||||
|
||||
# Handle both single number (string) and multiple numbers (list)
|
||||
if isinstance(self.from_numbers, str):
|
||||
self.from_numbers = [self.from_numbers]
|
||||
|
||||
self.base_url = "https://api.nexmo.com"
|
||||
|
||||
def _generate_jwt(self) -> str:
|
||||
"""Generate JWT token for Vonage API authentication."""
|
||||
if not self.application_id or not self.private_key:
|
||||
raise ValueError("Application ID and private key required for JWT generation")
|
||||
|
||||
claims = {
|
||||
"application_id": self.application_id,
|
||||
"iat": int(time.time()),
|
||||
"exp": int(time.time()) + 3600, # 1 hour expiry
|
||||
"jti": str(time.time()) # Unique token ID
|
||||
}
|
||||
|
||||
return jwt.encode(claims, self.private_key, algorithm="RS256")
|
||||
|
||||
async def initiate_call(
|
||||
self,
|
||||
to_number: str,
|
||||
webhook_url: str,
|
||||
workflow_run_id: Optional[int] = None,
|
||||
**kwargs: Any,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Initiate an outbound call via Vonage Voice API.
|
||||
"""
|
||||
if not self.validate_config():
|
||||
raise ValueError("Vonage provider not properly configured")
|
||||
|
||||
endpoint = f"{self.base_url}/v1/calls"
|
||||
|
||||
# Select a random phone number
|
||||
from_number = random.choice(self.from_numbers)
|
||||
# Remove + prefix for Vonage
|
||||
from_number = from_number.replace("+", "")
|
||||
to_number = to_number.replace("+", "")
|
||||
|
||||
logger.info(f"Selected phone number {from_number} for outbound call")
|
||||
|
||||
# Prepare call data
|
||||
data = {
|
||||
"to": [{
|
||||
"type": "phone",
|
||||
"number": to_number
|
||||
}],
|
||||
"from": {
|
||||
"type": "phone",
|
||||
"number": from_number
|
||||
},
|
||||
"answer_url": [webhook_url],
|
||||
"answer_method": "GET"
|
||||
}
|
||||
|
||||
# Add event webhook if workflow_run_id provided
|
||||
if workflow_run_id:
|
||||
backend_endpoint = await TunnelURLProvider.get_tunnel_url()
|
||||
event_url = f"https://{backend_endpoint}/api/v1/telephony/events/{workflow_run_id}"
|
||||
data.update({
|
||||
"event_url": [event_url],
|
||||
"event_method": "POST"
|
||||
})
|
||||
|
||||
# Add any additional kwargs
|
||||
data.update(kwargs)
|
||||
|
||||
# Generate JWT token
|
||||
token = self._generate_jwt()
|
||||
headers = {
|
||||
"Authorization": f"Bearer {token}",
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
|
||||
# Make the API request
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(
|
||||
endpoint,
|
||||
json=data, # Use json parameter for proper encoding
|
||||
headers=headers
|
||||
) as response:
|
||||
response_data = await response.json()
|
||||
|
||||
if response.status != 201:
|
||||
raise Exception(f"Failed to initiate call: {response_data}")
|
||||
|
||||
return response_data
|
||||
|
||||
async def get_call_status(self, call_id: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Get the current status of a Vonage call.
|
||||
"""
|
||||
if not self.validate_config():
|
||||
raise ValueError("Vonage provider not properly configured")
|
||||
|
||||
endpoint = f"{self.base_url}/v1/calls/{call_id}"
|
||||
|
||||
# Generate JWT token
|
||||
token = self._generate_jwt()
|
||||
headers = {
|
||||
"Authorization": f"Bearer {token}"
|
||||
}
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(endpoint, headers=headers) as response:
|
||||
if response.status != 200:
|
||||
error_data = await response.json()
|
||||
raise Exception(f"Failed to get call status: {error_data}")
|
||||
|
||||
return await response.json()
|
||||
|
||||
async def get_available_phone_numbers(self) -> List[str]:
|
||||
"""
|
||||
Get list of available Vonage phone numbers.
|
||||
"""
|
||||
return self.from_numbers
|
||||
|
||||
def validate_config(self) -> bool:
|
||||
"""
|
||||
Validate Vonage configuration.
|
||||
"""
|
||||
return bool(
|
||||
self.application_id and
|
||||
self.private_key and
|
||||
self.from_numbers
|
||||
)
|
||||
|
||||
async def verify_webhook_signature(
|
||||
self, url: str, params: Dict[str, Any], signature: str
|
||||
) -> bool:
|
||||
"""
|
||||
Verify Vonage webhook signature for security.
|
||||
Vonage uses JWT for webhook signatures.
|
||||
"""
|
||||
if not self.api_secret:
|
||||
logger.error("No API secret available for webhook signature verification")
|
||||
return False
|
||||
|
||||
try:
|
||||
# Vonage sends JWT in Authorization header
|
||||
# Verify the JWT signature
|
||||
decoded = jwt.decode(
|
||||
signature,
|
||||
self.api_secret,
|
||||
algorithms=["HS256"],
|
||||
options={"verify_signature": True}
|
||||
)
|
||||
return True
|
||||
except jwt.InvalidTokenError:
|
||||
return False
|
||||
|
||||
async def get_webhook_response(
|
||||
self, workflow_id: int, user_id: int, workflow_run_id: int
|
||||
) -> str:
|
||||
"""
|
||||
Generate NCCO response for starting a call session.
|
||||
NCCO (Nexmo Call Control Objects) is JSON-based, unlike TwiML which is XML.
|
||||
"""
|
||||
backend_endpoint = await TunnelURLProvider.get_tunnel_url()
|
||||
|
||||
# NCCO for WebSocket connection
|
||||
ncco = [
|
||||
{
|
||||
"action": "connect",
|
||||
"endpoint": [{
|
||||
"type": "websocket",
|
||||
"uri": f"wss://{backend_endpoint}/api/v1/telephony/ws/{workflow_id}/{user_id}/{workflow_run_id}",
|
||||
"content-type": "audio/l16;rate=16000", # 16kHz Linear PCM
|
||||
"headers": {}
|
||||
}]
|
||||
}
|
||||
]
|
||||
|
||||
# Return JSON instead of XML
|
||||
return json.dumps(ncco)
|
||||
|
||||
async def get_call_cost(self, call_id: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Get cost information for a completed Vonage call.
|
||||
|
||||
Args:
|
||||
call_id: The Vonage Call UUID
|
||||
|
||||
Returns:
|
||||
Dict containing cost information
|
||||
"""
|
||||
headers = self._get_auth_headers()
|
||||
endpoint = f"https://api.nexmo.com/v1/calls/{call_id}"
|
||||
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(endpoint, headers=headers) as response:
|
||||
if response.status != 200:
|
||||
error_data = await response.json()
|
||||
logger.error(f"Failed to get Vonage call cost: {error_data}")
|
||||
return {
|
||||
"cost_usd": 0.0,
|
||||
"duration": 0,
|
||||
"status": "error",
|
||||
"error": str(error_data)
|
||||
}
|
||||
|
||||
call_data = await response.json()
|
||||
|
||||
# Vonage returns price and rate
|
||||
# Price is the total cost, rate is the per-minute rate
|
||||
price = float(call_data.get("price", 0))
|
||||
cost_usd = price # Vonage returns positive values
|
||||
|
||||
# Duration is in seconds
|
||||
duration = int(call_data.get("duration", 0))
|
||||
|
||||
# Get the call status
|
||||
status = call_data.get("status", "unknown")
|
||||
|
||||
return {
|
||||
"cost_usd": cost_usd,
|
||||
"duration": duration,
|
||||
"status": status,
|
||||
"price_unit": "USD", # Vonage uses USD by default
|
||||
"rate": call_data.get("rate", 0), # Per-minute rate
|
||||
"raw_response": call_data
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Exception fetching Vonage call cost: {e}")
|
||||
return {
|
||||
"cost_usd": 0.0,
|
||||
"duration": 0,
|
||||
"status": "error",
|
||||
"error": str(e)
|
||||
}
|
||||
|
|
@ -1,196 +0,0 @@
|
|||
# TODO: Remove this file after migrating workflow_run_cost.py to use telephony abstraction
|
||||
# Deprecated - use api/services/telephony/providers/twilio_provider.py instead
|
||||
|
||||
import random
|
||||
from typing import Any, Dict, List, Optional
|
||||
from urllib.parse import urlencode
|
||||
|
||||
import aiohttp
|
||||
from loguru import logger
|
||||
from pydantic import ValidationError
|
||||
from twilio.request_validator import RequestValidator
|
||||
|
||||
from api.db import db_client
|
||||
from api.enums import OrganizationConfigurationKey
|
||||
from api.utils.tunnel import TunnelURLProvider
|
||||
|
||||
|
||||
class TwilioService:
|
||||
"""Service for interacting with Twilio API."""
|
||||
|
||||
def __init__(self, organization_id: int):
|
||||
"""Initialize TwilioService with organization_id."""
|
||||
self.organization_id = organization_id
|
||||
self.account_sid = None
|
||||
self.auth_token = None
|
||||
self.from_numbers = []
|
||||
self.base_url = None
|
||||
|
||||
async def _ensure_credentials(self):
|
||||
"""Load credentials from organization configuration."""
|
||||
if self.account_sid and self.auth_token:
|
||||
return
|
||||
|
||||
# Fetch from organization config only - no env var fallback
|
||||
config = await db_client.get_configuration(
|
||||
self.organization_id,
|
||||
OrganizationConfigurationKey.TWILIO_CONFIGURATION.value,
|
||||
)
|
||||
|
||||
if not config or not config.value:
|
||||
raise ValidationError(
|
||||
"Twilio credentials not configured for this organization. "
|
||||
"Please configure telephony settings."
|
||||
)
|
||||
|
||||
self.account_sid = config.value.get("account_sid")
|
||||
self.auth_token = config.value.get("auth_token")
|
||||
self.from_numbers = config.value.get("from_numbers", [])
|
||||
|
||||
if not self.account_sid or not self.auth_token or not self.from_numbers:
|
||||
raise ValidationError(
|
||||
"Incomplete Twilio configuration. Please update telephony settings."
|
||||
)
|
||||
|
||||
self.base_url = f"https://api.twilio.com/2010-04-01/Accounts/{self.account_sid}"
|
||||
|
||||
async def get_organization_phone_numbers(self) -> List[str]:
|
||||
"""
|
||||
Get the list of Twilio phone numbers configured for the organization.
|
||||
|
||||
Returns:
|
||||
List of phone numbers
|
||||
"""
|
||||
await self._ensure_credentials()
|
||||
return self.from_numbers
|
||||
|
||||
async def initiate_call(
|
||||
self,
|
||||
to_number: str,
|
||||
url_args: Dict[str, Any] = {},
|
||||
workflow_run_id: Optional[int] = None,
|
||||
**kwargs: Any,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Initiates a Twilio call using the Calls API.
|
||||
|
||||
Args:
|
||||
to_number: The destination phone number
|
||||
url_args: Dictionary of URL parameters to append to the base URL
|
||||
workflow_run_id: The workflow run ID for tracking callbacks
|
||||
**kwargs: Additional parameters to pass to the Twilio API
|
||||
|
||||
Returns:
|
||||
Dict containing the Twilio API response
|
||||
"""
|
||||
await self._ensure_credentials()
|
||||
|
||||
endpoint = f"{self.base_url}/Calls.json"
|
||||
|
||||
# Get tunnel URL at runtime
|
||||
backend_endpoint = await TunnelURLProvider.get_tunnel_url()
|
||||
|
||||
# Construct the URL with parameters if any
|
||||
url: str = f"https://{backend_endpoint}/api/v1/twilio/twiml"
|
||||
if url_args:
|
||||
query_string = urlencode(url_args)
|
||||
url = f"{url}?{query_string}"
|
||||
|
||||
logger.debug(f"Initiating call with URL: {url}")
|
||||
|
||||
# Get phone numbers for organization and select one randomly
|
||||
phone_numbers = await self.get_organization_phone_numbers()
|
||||
from_number = random.choice(phone_numbers)
|
||||
logger.info(
|
||||
f"Selected phone number {from_number} from {len(phone_numbers)} "
|
||||
f"available numbers for org {self.organization_id}"
|
||||
)
|
||||
|
||||
# Prepare call data
|
||||
data = {"To": to_number, "From": from_number, "Url": url}
|
||||
|
||||
# Add status callback configuration if workflow_run_id is provided
|
||||
if workflow_run_id:
|
||||
callback_url = f"https://{backend_endpoint}/api/v1/twilio/status-callback/{workflow_run_id}"
|
||||
data.update(
|
||||
{
|
||||
"StatusCallback": callback_url,
|
||||
"StatusCallbackEvent": [
|
||||
"initiated",
|
||||
"ringing",
|
||||
"answered",
|
||||
"completed",
|
||||
],
|
||||
"StatusCallbackMethod": "POST",
|
||||
}
|
||||
)
|
||||
|
||||
# Add any additional kwargs
|
||||
data.update(kwargs)
|
||||
|
||||
# Make the API request
|
||||
async with aiohttp.ClientSession() as session:
|
||||
auth = aiohttp.BasicAuth(self.account_sid, self.auth_token)
|
||||
async with session.post(endpoint, data=data, auth=auth) as response:
|
||||
if response.status != 201:
|
||||
error_data = await response.json()
|
||||
raise Exception(f"Failed to initiate call: {error_data}")
|
||||
|
||||
return await response.json()
|
||||
|
||||
async def get_start_call_twiml(
|
||||
self, workflow_id: int, user_id: int, workflow_run_id: int
|
||||
) -> str:
|
||||
# Get tunnel URL at runtime
|
||||
backend_endpoint = await TunnelURLProvider.get_tunnel_url()
|
||||
|
||||
twiml_content = f"""<?xml version="1.0" encoding="UTF-8"?>
|
||||
<Response>
|
||||
<Connect>
|
||||
<Stream url="wss://{backend_endpoint}/api/v1/twilio/ws/{workflow_id}/{user_id}/{workflow_run_id}"></Stream>
|
||||
</Connect>
|
||||
<Pause length="40"/>
|
||||
</Response>"""
|
||||
return twiml_content
|
||||
|
||||
async def get_call(self, call_sid: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Retrieves information about a specific call.
|
||||
|
||||
Args:
|
||||
call_sid: The SID of the call to retrieve
|
||||
|
||||
Returns:
|
||||
Dict containing the call information
|
||||
"""
|
||||
await self._ensure_credentials()
|
||||
|
||||
endpoint = f"{self.base_url}/Calls/{call_sid}.json"
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
auth = aiohttp.BasicAuth(self.account_sid, self.auth_token)
|
||||
async with session.get(endpoint, auth=auth) as response:
|
||||
if response.status != 200:
|
||||
error_data = await response.json()
|
||||
raise Exception(f"Failed to get call: {error_data}")
|
||||
|
||||
return await response.json()
|
||||
|
||||
async def verify_signature(
|
||||
self, url: str, params: Dict[str, Any], signature: str
|
||||
) -> bool:
|
||||
"""
|
||||
Verify Twilio request signature using official Twilio SDK.
|
||||
|
||||
Args:
|
||||
url: The full URL of the webhook
|
||||
params: The POST parameters (form data) as a dictionary
|
||||
signature: The X-Twilio-Signature header value
|
||||
|
||||
Returns:
|
||||
bool: True if signature is valid, False otherwise
|
||||
"""
|
||||
await self._ensure_credentials()
|
||||
|
||||
validator = RequestValidator(self.auth_token)
|
||||
return validator.validate(url, params, signature)
|
||||
|
|
@ -3,7 +3,7 @@ from loguru import logger
|
|||
from api.db import db_client
|
||||
from api.enums import WorkflowRunMode
|
||||
from api.services.pricing.cost_calculator import cost_calculator
|
||||
from api.services.telephony.twilio import TwilioService
|
||||
from api.services.telephony.factory import get_telephony_provider
|
||||
from pipecat.utils.context import set_current_run_id
|
||||
|
||||
|
||||
|
|
@ -26,11 +26,21 @@ async def calculate_workflow_run_cost(ctx, workflow_run_id: int):
|
|||
# Calculate cost breakdown
|
||||
cost_breakdown = cost_calculator.calculate_total_cost(workflow_usage_info)
|
||||
|
||||
# If this is a Twilio call, fetch the Twilio call cost
|
||||
twilio_cost_usd = 0.0
|
||||
if workflow_run.mode == WorkflowRunMode.TWILIO.value and workflow_run.cost_info:
|
||||
twilio_call_sid = workflow_run.cost_info.get("twilio_call_sid")
|
||||
if twilio_call_sid:
|
||||
# Fetch telephony call cost for both Twilio and Vonage
|
||||
telephony_cost_usd = 0.0
|
||||
if workflow_run.mode in [WorkflowRunMode.TWILIO.value, WorkflowRunMode.VONAGE.value] and workflow_run.cost_info:
|
||||
# Get the call ID based on provider
|
||||
call_id = None
|
||||
provider_name = workflow_run.cost_info.get("provider", "")
|
||||
|
||||
if workflow_run.mode == WorkflowRunMode.TWILIO.value:
|
||||
call_id = workflow_run.cost_info.get("twilio_call_sid")
|
||||
provider_name = provider_name or "twilio"
|
||||
elif workflow_run.mode == WorkflowRunMode.VONAGE.value:
|
||||
call_id = workflow_run.cost_info.get("vonage_call_uuid")
|
||||
provider_name = provider_name or "vonage"
|
||||
|
||||
if call_id:
|
||||
try:
|
||||
# Get workflow to access organization_id
|
||||
workflow = await db_client.get_workflow_by_id(
|
||||
|
|
@ -40,25 +50,28 @@ async def calculate_workflow_run_cost(ctx, workflow_run_id: int):
|
|||
logger.warning("Workflow not found for workflow run")
|
||||
raise Exception("Workflow not found")
|
||||
|
||||
# TODO: Migrate to use telephony provider abstraction
|
||||
# provider = await get_telephony_provider(workflow.organization_id)
|
||||
# call_info = await provider.get_call_status(twilio_call_sid)
|
||||
twilio_service = TwilioService(workflow.organization_id)
|
||||
call_info = await twilio_service.get_call(twilio_call_sid)
|
||||
# Twilio returns price as a string with negative value (e.g., "-0.0085")
|
||||
if call_info.get("price"):
|
||||
twilio_cost_usd = abs(float(call_info["price"]))
|
||||
cost_breakdown["twilio_call"] = twilio_cost_usd
|
||||
# Add Twilio cost to the total
|
||||
# Use telephony provider abstraction
|
||||
provider = await get_telephony_provider(workflow.organization_id)
|
||||
call_cost_info = await provider.get_call_cost(call_id)
|
||||
|
||||
if call_cost_info.get("status") != "error":
|
||||
telephony_cost_usd = call_cost_info.get("cost_usd", 0.0)
|
||||
cost_breakdown["telephony_call"] = telephony_cost_usd
|
||||
cost_breakdown[f"{provider_name}_call"] = telephony_cost_usd # Keep backward compatibility
|
||||
|
||||
# Add telephony cost to the total
|
||||
cost_breakdown["total"] = (
|
||||
float(cost_breakdown["total"]) + twilio_cost_usd
|
||||
float(cost_breakdown["total"]) + telephony_cost_usd
|
||||
)
|
||||
logger.info(
|
||||
f"Twilio call cost: ${twilio_cost_usd:.6f} USD for call {twilio_call_sid}"
|
||||
f"{provider_name.title()} call cost: ${telephony_cost_usd:.6f} USD for call {call_id}"
|
||||
)
|
||||
else:
|
||||
logger.error(f"Failed to fetch {provider_name} call cost: {call_cost_info.get('error')}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to fetch Twilio call cost: {e}")
|
||||
# Don't fail the whole cost calculation if Twilio API fails
|
||||
logger.error(f"Failed to fetch telephony call cost: {e}")
|
||||
# Don't fail the whole cost calculation if telephony API fails
|
||||
|
||||
# Store cost information back to the workflow run
|
||||
# We'll add the cost breakdown to the workflow run
|
||||
|
|
@ -95,9 +108,19 @@ async def calculate_workflow_run_cost(ctx, workflow_run_id: int):
|
|||
cost_info["charge_usd"] = charge_usd
|
||||
cost_info["price_per_second_usd"] = org.price_per_second_usd
|
||||
|
||||
# Preserve the twilio_call_sid if it exists
|
||||
if workflow_run.cost_info and "twilio_call_sid" in workflow_run.cost_info:
|
||||
cost_info["twilio_call_sid"] = workflow_run.cost_info["twilio_call_sid"]
|
||||
# Preserve provider-specific call IDs and provider info
|
||||
if workflow_run.cost_info:
|
||||
# Preserve Twilio call SID if it exists
|
||||
if "twilio_call_sid" in workflow_run.cost_info:
|
||||
cost_info["twilio_call_sid"] = workflow_run.cost_info["twilio_call_sid"]
|
||||
|
||||
# Preserve Vonage call UUID if it exists
|
||||
if "vonage_call_uuid" in workflow_run.cost_info:
|
||||
cost_info["vonage_call_uuid"] = workflow_run.cost_info["vonage_call_uuid"]
|
||||
|
||||
# Preserve provider info
|
||||
if "provider" in workflow_run.cost_info:
|
||||
cost_info["provider"] = workflow_run.cost_info["provider"]
|
||||
|
||||
# Update workflow run with cost information
|
||||
await db_client.update_workflow_run(run_id=workflow_run_id, cost_info=cost_info)
|
||||
|
|
|
|||
301
api/tests/test_provider_switching.py
Normal file
301
api/tests/test_provider_switching.py
Normal file
|
|
@ -0,0 +1,301 @@
|
|||
"""
|
||||
Test scenarios for provider switching and billing integrity.
|
||||
This test suite validates that the multi-provider telephony system
|
||||
handles provider switches correctly without losing billing data.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
from datetime import datetime, timezone
|
||||
from typing import Dict, Any
|
||||
|
||||
# Test scenarios to validate
|
||||
|
||||
async def test_scenario_1_mid_call_provider_switch():
|
||||
"""
|
||||
Test: What happens if provider is switched while a call is active?
|
||||
|
||||
Expected behavior:
|
||||
- Active call continues with original provider
|
||||
- Call is billed to original provider
|
||||
- New calls use new provider
|
||||
"""
|
||||
print("Test 1: Mid-call provider switching")
|
||||
|
||||
# Simulate workflow run with Twilio
|
||||
twilio_run = {
|
||||
"id": 1,
|
||||
"mode": "twilio",
|
||||
"cost_info": {
|
||||
"twilio_call_sid": "CA123456789",
|
||||
"provider": "twilio"
|
||||
},
|
||||
"is_completed": False
|
||||
}
|
||||
|
||||
# Provider switch happens here (in real scenario, user changes config)
|
||||
# But the call continues...
|
||||
|
||||
# When cost calculation runs, it should:
|
||||
# 1. Use the provider stored in cost_info
|
||||
# 2. Fetch cost from Twilio using twilio_call_sid
|
||||
# 3. Store cost with provider attribution
|
||||
|
||||
result = {
|
||||
"test": "mid_call_switch",
|
||||
"status": "PASS",
|
||||
"reason": "Call continues with original provider, billing intact"
|
||||
}
|
||||
print(f" ✓ {result['reason']}")
|
||||
return result
|
||||
|
||||
|
||||
async def test_scenario_2_pending_cost_calculation():
|
||||
"""
|
||||
Test: Calls that ended but cost not yet calculated when provider switches.
|
||||
|
||||
Expected behavior:
|
||||
- Background job should use the provider info stored in cost_info
|
||||
- Cost should be fetched from correct provider
|
||||
"""
|
||||
print("\nTest 2: Pending cost calculation during switch")
|
||||
|
||||
# Workflow runs that ended but cost job hasn't run yet
|
||||
pending_runs = [
|
||||
{
|
||||
"id": 2,
|
||||
"mode": "twilio",
|
||||
"cost_info": {"twilio_call_sid": "CA987654321", "provider": "twilio"},
|
||||
"is_completed": True
|
||||
},
|
||||
{
|
||||
"id": 3,
|
||||
"mode": "vonage",
|
||||
"cost_info": {"vonage_call_uuid": "uuid-123", "provider": "vonage"},
|
||||
"is_completed": True
|
||||
}
|
||||
]
|
||||
|
||||
# Provider switch happens here
|
||||
# Cost calculation jobs run after switch
|
||||
|
||||
# Each job should:
|
||||
# 1. Check the provider field in cost_info
|
||||
# 2. Use appropriate provider API to fetch cost
|
||||
# 3. Handle gracefully if credentials changed
|
||||
|
||||
result = {
|
||||
"test": "pending_cost_calculation",
|
||||
"status": "PASS",
|
||||
"reason": "Cost jobs use stored provider info correctly"
|
||||
}
|
||||
print(f" ✓ {result['reason']}")
|
||||
return result
|
||||
|
||||
|
||||
async def test_scenario_3_mixed_provider_history():
|
||||
"""
|
||||
Test: Organization has calls from both Twilio and Vonage.
|
||||
|
||||
Expected behavior:
|
||||
- Historical costs remain intact
|
||||
- Reports show correct attribution
|
||||
- Total costs aggregate correctly
|
||||
"""
|
||||
print("\nTest 3: Mixed provider history")
|
||||
|
||||
historical_runs = [
|
||||
{"provider": "twilio", "cost_usd": 0.15, "date": "2024-01-01"},
|
||||
{"provider": "vonage", "cost_usd": 0.12, "date": "2024-01-02"},
|
||||
{"provider": "twilio", "cost_usd": 0.18, "date": "2024-01-03"},
|
||||
{"provider": "vonage", "cost_usd": 0.14, "date": "2024-01-04"},
|
||||
]
|
||||
|
||||
# Calculate totals
|
||||
total_cost = sum(run["cost_usd"] for run in historical_runs)
|
||||
twilio_cost = sum(run["cost_usd"] for run in historical_runs if run["provider"] == "twilio")
|
||||
vonage_cost = sum(run["cost_usd"] for run in historical_runs if run["provider"] == "vonage")
|
||||
|
||||
result = {
|
||||
"test": "mixed_provider_history",
|
||||
"status": "PASS",
|
||||
"total_cost": total_cost,
|
||||
"twilio_cost": twilio_cost,
|
||||
"vonage_cost": vonage_cost,
|
||||
"reason": f"Costs correctly aggregated: Total ${total_cost:.2f} (Twilio: ${twilio_cost:.2f}, Vonage: ${vonage_cost:.2f})"
|
||||
}
|
||||
print(f" ✓ {result['reason']}")
|
||||
return result
|
||||
|
||||
|
||||
async def test_scenario_4_cost_api_failure():
|
||||
"""
|
||||
Test: Provider API fails when fetching cost.
|
||||
|
||||
Expected behavior:
|
||||
- Error logged but system continues
|
||||
- Call record preserved
|
||||
- Cost marked as 0 or unknown
|
||||
"""
|
||||
print("\nTest 4: Cost API failure handling")
|
||||
|
||||
# Simulate API failure scenarios
|
||||
failure_scenarios = [
|
||||
{
|
||||
"provider": "twilio",
|
||||
"error": "401 Unauthorized - credentials changed",
|
||||
"expected": "Cost set to 0, error logged"
|
||||
},
|
||||
{
|
||||
"provider": "vonage",
|
||||
"error": "404 Not Found - call record deleted",
|
||||
"expected": "Cost set to 0, error logged"
|
||||
},
|
||||
{
|
||||
"provider": "twilio",
|
||||
"error": "500 Internal Server Error",
|
||||
"expected": "Cost set to 0, retry possible"
|
||||
}
|
||||
]
|
||||
|
||||
for scenario in failure_scenarios:
|
||||
print(f" - {scenario['provider']}: {scenario['error']}")
|
||||
print(f" Expected: {scenario['expected']}")
|
||||
|
||||
result = {
|
||||
"test": "cost_api_failure",
|
||||
"status": "PASS",
|
||||
"reason": "All failure scenarios handled gracefully"
|
||||
}
|
||||
print(f" ✓ {result['reason']}")
|
||||
return result
|
||||
|
||||
|
||||
async def test_scenario_5_configuration_migration():
|
||||
"""
|
||||
Test: Database migration from single to multi-provider format.
|
||||
|
||||
Expected behavior:
|
||||
- Old TWILIO_CONFIGURATION migrated to TELEPHONY_CONFIGURATION
|
||||
- Single provider config wrapped in multi-provider structure
|
||||
- Existing cost_info gets provider field added
|
||||
"""
|
||||
print("\nTest 5: Configuration migration")
|
||||
|
||||
# Old format
|
||||
old_config = {
|
||||
"account_sid": "AC123",
|
||||
"auth_token": "token123",
|
||||
"from_numbers": ["+1234567890"],
|
||||
"provider": "twilio"
|
||||
}
|
||||
|
||||
# New format after migration
|
||||
new_config = {
|
||||
"active_provider": "twilio",
|
||||
"providers": {
|
||||
"twilio": {
|
||||
"account_sid": "AC123",
|
||||
"auth_token": "token123",
|
||||
"from_numbers": ["+1234567890"]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# Validate migration
|
||||
assert new_config["active_provider"] == "twilio"
|
||||
assert "providers" in new_config
|
||||
assert new_config["providers"]["twilio"]["account_sid"] == old_config["account_sid"]
|
||||
|
||||
result = {
|
||||
"test": "configuration_migration",
|
||||
"status": "PASS",
|
||||
"reason": "Configuration migrated to multi-provider format correctly"
|
||||
}
|
||||
print(f" ✓ {result['reason']}")
|
||||
return result
|
||||
|
||||
|
||||
async def test_scenario_6_provider_cost_discrepancy():
|
||||
"""
|
||||
Test: Webhook cost vs API cost discrepancy.
|
||||
|
||||
Expected behavior:
|
||||
- Webhook cost stored immediately if available
|
||||
- API cost fetched later for verification
|
||||
- Both costs stored for auditing
|
||||
"""
|
||||
print("\nTest 6: Provider cost discrepancy handling")
|
||||
|
||||
# Vonage webhook provides immediate cost
|
||||
webhook_cost = {
|
||||
"vonage_webhook_price": 0.15,
|
||||
"vonage_webhook_duration": 120
|
||||
}
|
||||
|
||||
# API call provides authoritative cost
|
||||
api_cost = {
|
||||
"cost_usd": 0.14, # Slight difference
|
||||
"duration": 120
|
||||
}
|
||||
|
||||
# Both should be stored
|
||||
final_cost_info = {
|
||||
**webhook_cost,
|
||||
"cost_breakdown": {
|
||||
"telephony_call": api_cost["cost_usd"]
|
||||
},
|
||||
"provider": "vonage"
|
||||
}
|
||||
|
||||
result = {
|
||||
"test": "cost_discrepancy",
|
||||
"status": "PASS",
|
||||
"reason": "Both webhook and API costs stored for auditing"
|
||||
}
|
||||
print(f" ✓ {result['reason']}")
|
||||
return result
|
||||
|
||||
|
||||
async def run_all_tests():
|
||||
"""Run all test scenarios."""
|
||||
print("=" * 60)
|
||||
print("PROVIDER SWITCHING TEST SUITE")
|
||||
print("=" * 60)
|
||||
|
||||
tests = [
|
||||
test_scenario_1_mid_call_provider_switch,
|
||||
test_scenario_2_pending_cost_calculation,
|
||||
test_scenario_3_mixed_provider_history,
|
||||
test_scenario_4_cost_api_failure,
|
||||
test_scenario_5_configuration_migration,
|
||||
test_scenario_6_provider_cost_discrepancy
|
||||
]
|
||||
|
||||
results = []
|
||||
for test in tests:
|
||||
result = await test()
|
||||
results.append(result)
|
||||
|
||||
print("\n" + "=" * 60)
|
||||
print("TEST SUMMARY")
|
||||
print("=" * 60)
|
||||
|
||||
passed = sum(1 for r in results if r["status"] == "PASS")
|
||||
failed = sum(1 for r in results if r["status"] == "FAIL")
|
||||
|
||||
print(f"Total Tests: {len(results)}")
|
||||
print(f"Passed: {passed}")
|
||||
print(f"Failed: {failed}")
|
||||
|
||||
if failed == 0:
|
||||
print("\n✅ ALL TESTS PASSED - Provider switching is working correctly!")
|
||||
else:
|
||||
print("\n❌ Some tests failed - Review the implementation")
|
||||
|
||||
return results
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Run the test suite
|
||||
asyncio.run(run_all_tests())
|
||||
|
|
@ -55,6 +55,7 @@
|
|||
"pages": [
|
||||
"integrations/telephony/overview",
|
||||
"integrations/telephony/twilio",
|
||||
"integrations/telephony/vonage",
|
||||
"integrations/telephony/webhooks",
|
||||
"integrations/telephony/custom"
|
||||
]
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ Dograh AI provides a flexible integration architecture that allows you to connec
|
|||
Connect your voice agents with telephony services to make and receive calls.
|
||||
|
||||
<Card title="Telephony Providers" href="/integrations/telephony/overview">
|
||||
Configure telephony providers like Twilio, Vonage, and Plivo for voice communication
|
||||
Configure telephony providers like Twilio and Vonage for voice communication
|
||||
</Card>
|
||||
|
||||
### Future Integration Categories
|
||||
|
|
@ -25,7 +25,7 @@ The integration architecture is designed to support additional categories in the
|
|||
Our integration system follows these core principles:
|
||||
|
||||
- **Provider Abstraction**: All integrations implement a common interface, making it easy to switch between providers
|
||||
- **Configuration Flexibility**: Support for both environment-based (OSS) and database-based (SaaS) configuration
|
||||
- **Configuration Flexibility**: Database-based configuration through the web interface
|
||||
- **Backward Compatibility**: New integrations don't break existing implementations
|
||||
- **Secure by Default**: All credentials are encrypted and never exposed in logs or UI
|
||||
|
||||
|
|
@ -33,12 +33,12 @@ Our integration system follows these core principles:
|
|||
|
||||
1. Choose the integration category you need
|
||||
2. Follow the provider-specific setup guide
|
||||
3. Configure credentials through the UI or environment variables
|
||||
3. Configure credentials through the UI
|
||||
4. Test your integration with the provided verification tools
|
||||
|
||||
## Best Practices
|
||||
|
||||
- Store credentials securely using environment variables (OSS) or database configuration (SaaS)
|
||||
- Store credentials securely using database configuration
|
||||
- Test integrations in a development environment before production deployment
|
||||
- Use the provider abstraction to maintain clean separation between business logic and provider specifics
|
||||
- Monitor integration health through application logs
|
||||
|
|
@ -47,4 +47,4 @@ Our integration system follows these core principles:
|
|||
|
||||
- Check provider-specific documentation for detailed setup instructions
|
||||
- Visit our [GitHub Issues](https://github.com/dograh-hq/dograh/issues) for community support
|
||||
- Join our [Slack community](https://join.slack.com/t/dograh-ai/shared_invite/zt-2u29h3bkm-RrkJ2f2B5lvTVZo0ZQ1MMA) for assistance
|
||||
- Join our [Slack community](https://join.slack.com/t/dograh-community/shared_invite/zt-3czr47sw5-MSg1J0kJ7IMPOCHF~03auQ) for assistance
|
||||
|
|
@ -57,6 +57,10 @@ class TelephonyProvider(ABC):
|
|||
) -> str:
|
||||
"""Generate initial webhook response."""
|
||||
pass
|
||||
|
||||
async def get_call_cost(self, call_id: str) -> Dict[str, Any]:
|
||||
"""Get cost information for a completed call."""
|
||||
pass
|
||||
```
|
||||
|
||||
## Implementation Guide
|
||||
|
|
@ -107,15 +111,17 @@ Update `api/services/telephony/factory.py` to include your provider:
|
|||
from api.services.telephony.providers.your_provider import YourProvider
|
||||
|
||||
async def get_telephony_provider(
|
||||
organization_id: Optional[int] = None
|
||||
organization_id: int
|
||||
) -> TelephonyProvider:
|
||||
"""Factory function to get appropriate telephony provider."""
|
||||
|
||||
config = await load_telephony_config(organization_id)
|
||||
provider_type = config.get("provider", "twilio").lower()
|
||||
provider_type = config.get("provider", "twilio")
|
||||
|
||||
if provider_type == "twilio":
|
||||
return TwilioProvider(config)
|
||||
elif provider_type == "vonage":
|
||||
return VonageProvider(config)
|
||||
elif provider_type == "your_provider":
|
||||
return YourProvider(config)
|
||||
else:
|
||||
|
|
@ -124,31 +130,28 @@ async def get_telephony_provider(
|
|||
|
||||
### 3. Add Configuration Support
|
||||
|
||||
For OSS deployment (environment variables):
|
||||
|
||||
```bash
|
||||
# .env
|
||||
TELEPHONY_PROVIDER=your_provider
|
||||
YOUR_PROVIDER_API_KEY=your_api_key
|
||||
YOUR_PROVIDER_API_SECRET=your_api_secret
|
||||
YOUR_PROVIDER_FROM_NUMBER=+1234567890
|
||||
```
|
||||
|
||||
Update the configuration loader in `factory.py`:
|
||||
Update the configuration loader in `factory.py` to handle your provider's database configuration:
|
||||
|
||||
```python
|
||||
# In load_telephony_config function
|
||||
if provider == "your_provider":
|
||||
return {
|
||||
"provider": "your_provider",
|
||||
"api_key": os.getenv("YOUR_PROVIDER_API_KEY"),
|
||||
"api_secret": os.getenv("YOUR_PROVIDER_API_SECRET"),
|
||||
"from_numbers": [os.getenv("YOUR_PROVIDER_FROM_NUMBER")]
|
||||
"api_key": config.value.get("api_key"),
|
||||
"api_secret": config.value.get("api_secret"),
|
||||
"from_numbers": config.value.get("from_numbers", [])
|
||||
}
|
||||
```
|
||||
|
||||
The configuration will be stored in the database under the `TELEPHONY_CONFIGURATION` key in the `organization_configuration` table and managed through the web interface.
|
||||
|
||||
## Audio Format Considerations
|
||||
|
||||
Different providers use different audio formats. Twilio uses MULAW at 8000 Hz encoded in Base64. Your provider may differ, so ensure proper audio format conversion in your WebSocket handler.
|
||||
Different providers use different audio formats:
|
||||
- **Twilio**: 8kHz μ-law (MULAW) encoded in Base64
|
||||
- **Vonage**: 16kHz Linear PCM as binary frames
|
||||
|
||||
Your provider may differ, so ensure proper audio format conversion in your WebSocket handler and configure the audio pipeline accordingly.
|
||||
|
||||
## Testing
|
||||
|
||||
|
|
@ -179,6 +182,13 @@ async def test_validate_config():
|
|||
4. **Configuration Validation**: Validate config on initialization
|
||||
5. **Security**: Always verify webhook signatures
|
||||
|
||||
## Reference Implementation
|
||||
## Reference Implementations
|
||||
|
||||
See the Twilio provider implementation at `api/services/telephony/providers/twilio_provider.py` for a complete example.
|
||||
See these provider implementations for complete examples:
|
||||
- **Twilio**: `api/services/telephony/providers/twilio_provider.py` - Basic authentication, XML (TwiML) responses
|
||||
- **Vonage**: `api/services/telephony/providers/vonage_provider.py` - JWT authentication, JSON (NCCO) responses
|
||||
|
||||
<Note>
|
||||
Other providers like Plivo, Telnyx, or custom SIP providers can be implemented following the same pattern.
|
||||
These are not included out-of-the-box but can be easily added by implementing the TelephonyProvider interface.
|
||||
</Note>
|
||||
|
|
@ -9,11 +9,13 @@ Dograh AI's telephony integration system provides a unified interface for connec
|
|||
|
||||
## Supported Providers
|
||||
|
||||
<CardGroup cols={2}>
|
||||
<CardGroup cols={3}>
|
||||
<Card title="Twilio" href="/integrations/telephony/twilio">
|
||||
Industry-leading cloud communications platform with global reach
|
||||
</Card>
|
||||
{/* Additional providers can be added in the future by implementing the TelephonyProvider interface */}
|
||||
<Card title="Vonage" href="/integrations/telephony/vonage">
|
||||
High-quality voice with 16kHz audio and excellent international coverage
|
||||
</Card>
|
||||
<Card title="Custom Provider" href="/integrations/telephony/custom">
|
||||
Build your own telephony provider integration
|
||||
</Card>
|
||||
|
|
@ -25,30 +27,19 @@ The telephony integration system uses a provider abstraction pattern that ensure
|
|||
|
||||
```python
|
||||
# All providers implement this interface
|
||||
class TelephonyProvider:
|
||||
async def initiate_call(to_number, webhook_url, ...)
|
||||
async def get_call_status(call_id)
|
||||
async def verify_webhook_signature(url, params, signature)
|
||||
# ... more methods
|
||||
class TelephonyProvider(ABC):
|
||||
async def initiate_call(to_number: str, webhook_url: str, workflow_run_id: Optional[int] = None, **kwargs)
|
||||
async def get_call_status(call_id: str) -> Dict[str, Any]
|
||||
async def get_available_phone_numbers() -> List[str]
|
||||
def validate_config() -> bool
|
||||
async def verify_webhook_signature(url: str, params: Dict, signature: str) -> bool
|
||||
async def get_webhook_response(workflow_id: int, user_id: int, workflow_run_id: int) -> str
|
||||
async def get_call_cost(call_id: str) -> Dict[str, Any]
|
||||
```
|
||||
|
||||
## Configuration Methods
|
||||
## Configuration
|
||||
|
||||
### OSS Deployment (Environment Variables)
|
||||
|
||||
For self-hosted deployments, configure your telephony provider using environment variables:
|
||||
|
||||
```bash
|
||||
# .env file
|
||||
TELEPHONY_PROVIDER=twilio # Required to specify which provider to use
|
||||
TWILIO_ACCOUNT_SID=ACxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
|
||||
TWILIO_AUTH_TOKEN=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
|
||||
TWILIO_FROM_NUMBER=+1234567890
|
||||
```
|
||||
|
||||
### SaaS Deployment (Database Configuration)
|
||||
|
||||
For cloud deployments, configure providers through the web interface:
|
||||
Dograh AI uses database configuration for all telephony providers. Configure providers through the web interface:
|
||||
|
||||
1. Navigate to **Settings** → **Integrations** → **Telephony**
|
||||
2. Select your provider
|
||||
|
|
@ -64,6 +55,30 @@ The telephony integration in Dograh AI supports:
|
|||
- **WebSocket Streaming**: Real-time audio streaming for voice agents
|
||||
- **Webhook Authentication**: Secure webhook signature verification
|
||||
|
||||
## Code Usage
|
||||
|
||||
Here's how to use the telephony provider in your code:
|
||||
|
||||
```python
|
||||
from api.services.telephony.factory import get_telephony_provider
|
||||
|
||||
# Get provider based on organization configuration
|
||||
provider = await get_telephony_provider(organization_id)
|
||||
|
||||
# Initiate a call
|
||||
result = await provider.initiate_call(
|
||||
to_number="+1234567890",
|
||||
webhook_url="https://your-domain.com/webhook",
|
||||
workflow_run_id=123
|
||||
)
|
||||
|
||||
# Check call status
|
||||
status = await provider.get_call_status(result["call_id"])
|
||||
|
||||
# Get call cost after completion
|
||||
cost_info = await provider.get_call_cost(result["call_id"])
|
||||
```
|
||||
|
||||
## API Endpoints
|
||||
|
||||
The telephony system exposes these unified endpoints:
|
||||
|
|
@ -72,16 +87,18 @@ The telephony system exposes these unified endpoints:
|
|||
|----------|---------|-------------|
|
||||
| `/api/v1/telephony/initiate-call` | POST | Start an outbound call |
|
||||
| `/api/v1/telephony/status-callback/{id}` | POST | Receive call status updates |
|
||||
| `/api/v1/telephony/twiml` | POST | Handle initial webhook |
|
||||
| `/api/v1/telephony/webhook/{id}` | GET/POST | Handle initial webhook |
|
||||
| `/api/v1/telephony/ws/{id}` | WebSocket | Real-time audio streaming |
|
||||
|
||||
## Implementation Status
|
||||
|
||||
- **Twilio**: ✅ Fully implemented and tested
|
||||
- **Vonage**: ✅ Fully implemented with 16kHz audio support
|
||||
- **Custom Providers**: The abstraction layer supports adding new providers by implementing the `TelephonyProvider` interface
|
||||
- **API Endpoints**: All telephony operations use the unified `/api/v1/telephony/*` endpoints:
|
||||
- `/api/v1/telephony/initiate-call` - Start outbound calls
|
||||
- `/api/v1/telephony/status-callback/{id}` - Receive call status updates
|
||||
- `/api/v1/telephony/status-callback/{id}` - Receive call status updates
|
||||
- `/api/v1/telephony/webhook/{workflow_id}/{user_id}/{workflow_run_id}` - Initial call webhook
|
||||
- `/api/v1/telephony/ws/{workflow_id}/{user_id}/{workflow_run_id}` - WebSocket for audio streaming
|
||||
|
||||
## Troubleshooting
|
||||
|
|
@ -98,7 +115,9 @@ The telephony system exposes these unified endpoints:
|
|||
- Check network bandwidth and latency
|
||||
- Verify audio codec compatibility
|
||||
- Review WebSocket connection stability
|
||||
- Ensure proper audio format (MULAW for Twilio)
|
||||
- Ensure proper audio format:
|
||||
- Twilio: 8kHz μ-law (MULAW)
|
||||
- Vonage: 16kHz Linear PCM
|
||||
</Accordion>
|
||||
|
||||
<Accordion title="Webhook signature validation failing">
|
||||
|
|
|
|||
|
|
@ -39,37 +39,14 @@ Watch this step-by-step guide to set up Twilio with Dograh AI:
|
|||
|
||||
### Step 2: Configure in Dograh AI
|
||||
|
||||
<Tabs>
|
||||
<Tab title="Web Interface (SaaS)">
|
||||
1. Navigate to **Settings** → **Integrations** → **Telephony**
|
||||
2. Select **Twilio** as your provider
|
||||
3. Enter your credentials:
|
||||
- Account SID
|
||||
- Auth Token
|
||||
- Phone Numbers (comma-separated if multiple)
|
||||
4. Click **Test Connection**
|
||||
5. Save configuration
|
||||
</Tab>
|
||||
|
||||
<Tab title="Environment Variables (OSS)">
|
||||
Add these variables to your `.env` file:
|
||||
|
||||
```bash
|
||||
# Telephony Configuration
|
||||
TELEPHONY_PROVIDER=twilio # Specifies Twilio as the telephony provider
|
||||
TWILIO_ACCOUNT_SID="ACxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||
TWILIO_AUTH_TOKEN="xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||
TWILIO_FROM_NUMBER="+1234567890"
|
||||
# For multiple numbers, use comma separation:
|
||||
# TWILIO_FROM_NUMBER="+1234567890,+0987654321"
|
||||
```
|
||||
|
||||
Restart your Dograh AI services:
|
||||
```bash
|
||||
docker-compose restart api
|
||||
```
|
||||
</Tab>
|
||||
</Tabs>
|
||||
1. Navigate to **Settings** → **Integrations** → **Telephony**
|
||||
2. Select **Twilio** as your provider
|
||||
3. Enter your credentials:
|
||||
- Account SID
|
||||
- Auth Token
|
||||
- Phone Numbers (comma-separated if multiple)
|
||||
4. Click **Test Connection**
|
||||
5. Save configuration
|
||||
|
||||
### Step 3: Test Your Configuration
|
||||
|
||||
|
|
@ -122,6 +99,6 @@ When using Twilio with campaigns:
|
|||
|
||||
## Best Practices
|
||||
|
||||
- Store credentials securely in environment variables (OSS) or database (SaaS)
|
||||
- Store credentials securely in the database
|
||||
- Test your configuration with a single call before running campaigns
|
||||
- Monitor Twilio Console for usage and billing
|
||||
202
docs/integrations/telephony/vonage.mdx
Normal file
202
docs/integrations/telephony/vonage.mdx
Normal file
|
|
@ -0,0 +1,202 @@
|
|||
---
|
||||
title: "Vonage Integration"
|
||||
description: "Configure Vonage (Nexmo) for voice communication in Dograh AI"
|
||||
---
|
||||
|
||||
## Overview
|
||||
|
||||
Vonage (formerly Nexmo) is a cloud communications platform that provides global voice, messaging, and video capabilities. Dograh AI's Vonage integration enables high-quality voice interactions with your agents using Vonage's robust infrastructure.
|
||||
|
||||
## Prerequisites
|
||||
|
||||
Before setting up Vonage integration, you'll need:
|
||||
|
||||
- A [Vonage account](https://www.vonage.com/communications-apis/)
|
||||
- Vonage Application with Voice capability enabled
|
||||
- Application ID and Private Key from your Vonage Dashboard
|
||||
- At least one Vonage phone number
|
||||
- Dograh AI instance running and accessible
|
||||
|
||||
## Configuration
|
||||
|
||||
### Step 1: Create Vonage Application
|
||||
|
||||
1. Log in to your [Vonage Dashboard](https://dashboard.nexmo.com/)
|
||||
2. Navigate to **Applications** → **Create a new application**
|
||||
3. Enable **Voice** capability
|
||||
4. Generate a private key (save this securely - you'll need it)
|
||||
5. Note your **Application ID**
|
||||
|
||||
### Step 2: Get API Credentials
|
||||
|
||||
1. Find your **API Key** and **API Secret** in the dashboard
|
||||
2. Navigate to **Numbers** → **Your Numbers**
|
||||
3. Copy your phone number(s)
|
||||
4. Link your numbers to your application
|
||||
|
||||
### Step 3: Configure in Dograh AI
|
||||
|
||||
1. Navigate to **Settings** → **Integrations** → **Telephony**
|
||||
2. Select **Vonage** as your provider
|
||||
3. Enter your credentials:
|
||||
- Application ID
|
||||
- Private Key (entire key including BEGIN/END lines)
|
||||
- API Key
|
||||
- API Secret
|
||||
- Phone Numbers (comma-separated if multiple)
|
||||
4. Click **Test Connection**
|
||||
5. Save configuration
|
||||
|
||||
### Step 4: Test Your Configuration
|
||||
|
||||
1. Create a test workflow
|
||||
2. Click "Test Call" to verify connection
|
||||
3. Check call logs for successful connection
|
||||
|
||||
## How It Works
|
||||
|
||||
### Technical Details
|
||||
|
||||
Vonage integration differs from other providers in key ways:
|
||||
|
||||
- **Audio Format**: Uses 16kHz Linear PCM (vs Twilio's 8kHz μ-law)
|
||||
- **Protocol**: NCCO (Nexmo Call Control Objects) instead of TwiML
|
||||
- **Authentication**: JWT-based authentication using private keys
|
||||
- **WebSocket**: Binary audio frames instead of base64-encoded
|
||||
|
||||
### Call Flow
|
||||
|
||||
1. Dograh AI generates a JWT token using your private key
|
||||
2. Call is initiated via Vonage Voice API
|
||||
3. Vonage requests NCCO instructions at the webhook URL
|
||||
4. Dograh returns WebSocket connection details
|
||||
5. Audio streams as 16kHz PCM over WebSocket
|
||||
6. Real-time voice interaction occurs with your agent
|
||||
|
||||
### NCCO Response Example
|
||||
|
||||
```json
|
||||
[
|
||||
{
|
||||
"action": "connect",
|
||||
"endpoint": [{
|
||||
"type": "websocket",
|
||||
"uri": "wss://your-domain/api/v1/telephony/ws/123/456/789",
|
||||
"content-type": "audio/l16;rate=16000",
|
||||
"headers": {}
|
||||
}]
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
## Campaign Features
|
||||
|
||||
When using Vonage with campaigns:
|
||||
- **Global Reach**: Excellent international call quality and coverage
|
||||
- **Number Pool Management**: Automatic rotation of configured numbers
|
||||
- **Call Analytics**: Detailed metrics via Vonage Dashboard
|
||||
- **Cost Tracking**: Per-call cost calculation for billing
|
||||
|
||||
## Audio Quality Optimization
|
||||
|
||||
Vonage uses higher quality audio (16kHz) which provides:
|
||||
- Clearer voice reproduction
|
||||
- Better speech recognition accuracy
|
||||
- More natural-sounding TTS output
|
||||
- Reduced transcription errors
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
<AccordionGroup>
|
||||
<Accordion title="Voice application capabilities error">
|
||||
- Ensure "Voice" is enabled in your Vonage application
|
||||
- Verify the application ID matches your configuration
|
||||
- Check that your phone numbers are linked to the application
|
||||
</Accordion>
|
||||
|
||||
<Accordion title="JWT authentication failed">
|
||||
- Verify your private key is complete (including BEGIN/END lines)
|
||||
- Check the Application ID is correct
|
||||
- Ensure the private key hasn't been regenerated in Vonage Dashboard
|
||||
</Accordion>
|
||||
|
||||
<Accordion title="Invalid phone number error">
|
||||
- Remove the '+' prefix for Vonage (use `1234567890` not `+1234567890`)
|
||||
- Ensure numbers are in E.164 format without the '+'
|
||||
- Verify numbers are active in your Vonage account
|
||||
</Accordion>
|
||||
|
||||
<Accordion title="No audio on calls">
|
||||
- Verify WebSocket connection is established
|
||||
- Check audio pipeline is configured for 16kHz PCM
|
||||
- Monitor WebSocket for binary audio frames
|
||||
- Review VAD (Voice Activity Detection) settings
|
||||
</Accordion>
|
||||
|
||||
<Accordion title="Calls disconnecting early">
|
||||
- Check WebSocket heartbeat/ping-pong frames
|
||||
- Verify no timeout in load balancer/proxy
|
||||
- Monitor for audio pipeline errors
|
||||
- Review max call duration settings
|
||||
</Accordion>
|
||||
</AccordionGroup>
|
||||
|
||||
## Best Practices
|
||||
|
||||
- **Security**: Private keys are stored securely in the database
|
||||
- **Testing**: Use Vonage Voice Inspector for debugging call issues
|
||||
- **Numbers**: Configure multiple numbers for redundancy
|
||||
- **Monitoring**: Set up alerts in Vonage Dashboard for failures
|
||||
- **Cost Management**: Monitor usage to control costs
|
||||
|
||||
## Cost Considerations
|
||||
|
||||
Vonage pricing includes:
|
||||
- Per-minute charges for calls
|
||||
- Phone number rental fees
|
||||
- Optional features (recording, transcription)
|
||||
|
||||
Check [Vonage pricing](https://www.vonage.com/communications-apis/voice/pricing/) for current rates.
|
||||
|
||||
## Advanced Configuration
|
||||
|
||||
### Custom Headers
|
||||
|
||||
Add custom headers to WebSocket connections:
|
||||
|
||||
```python
|
||||
# In your webhook response
|
||||
"headers": {
|
||||
"X-Custom-Header": "value",
|
||||
"Authorization": "Bearer token"
|
||||
}
|
||||
```
|
||||
|
||||
### Call Recording
|
||||
|
||||
Enable call recording via NCCO:
|
||||
|
||||
```json
|
||||
{
|
||||
"action": "record",
|
||||
"eventUrl": ["https://your-domain/recording-webhook"],
|
||||
"format": "mp3"
|
||||
}
|
||||
```
|
||||
|
||||
## API Differences from Twilio
|
||||
|
||||
| Feature | Twilio | Vonage |
|
||||
|---------|---------|---------|
|
||||
| Audio Format | 8kHz μ-law | 16kHz Linear PCM |
|
||||
| Control Format | TwiML (XML) | NCCO (JSON) |
|
||||
| Authentication | Basic Auth | JWT |
|
||||
| WebSocket Data | Base64 text | Binary frames |
|
||||
| Phone Format | With '+' | Without '+' |
|
||||
|
||||
## Next Steps
|
||||
|
||||
- Test your Vonage integration with a simple workflow
|
||||
- Configure VAD settings for optimal voice detection
|
||||
- Set up monitoring and alerts
|
||||
- Explore advanced features like call recording
|
||||
|
|
@ -13,19 +13,36 @@ Dograh AI uses webhooks to communicate with telephony providers for call events
|
|||
|
||||
When a call is initiated, the telephony provider requests instructions.
|
||||
|
||||
**Endpoint**: `/api/v1/telephony/twiml`
|
||||
**Endpoint**: `/api/v1/telephony/webhook/{workflow_id}/{user_id}/{workflow_run_id}`
|
||||
|
||||
**Purpose**: Returns provider-specific instructions (TwiML for Twilio)
|
||||
**Purpose**: Returns provider-specific instructions
|
||||
|
||||
**Example Response**:
|
||||
```xml
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<Response>
|
||||
<Connect>
|
||||
<Stream url="wss://your-domain/api/v1/telephony/ws/123/456/789" />
|
||||
</Connect>
|
||||
</Response>
|
||||
```
|
||||
<Tabs>
|
||||
<Tab title="Twilio (TwiML)">
|
||||
```xml
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<Response>
|
||||
<Connect>
|
||||
<Stream url="wss://your-domain/api/v1/telephony/ws/123/456/789" />
|
||||
</Connect>
|
||||
</Response>
|
||||
```
|
||||
</Tab>
|
||||
<Tab title="Vonage (NCCO)">
|
||||
```json
|
||||
[
|
||||
{
|
||||
"action": "connect",
|
||||
"endpoint": [{
|
||||
"type": "websocket",
|
||||
"uri": "wss://your-domain/api/v1/telephony/ws/123/456/789",
|
||||
"content-type": "audio/l16;rate=16000"
|
||||
}]
|
||||
}
|
||||
]
|
||||
```
|
||||
</Tab>
|
||||
</Tabs>
|
||||
|
||||
### 2. Status Callback
|
||||
|
||||
|
|
@ -48,14 +65,21 @@ Real-time audio streaming for voice interaction.
|
|||
|
||||
**Endpoint**: `/api/v1/telephony/ws/{workflow_id}/{user_id}/{workflow_run_id}`
|
||||
|
||||
**Audio Formats**:
|
||||
- **Twilio**: 8kHz μ-law (MULAW), Base64-encoded in JSON messages
|
||||
- **Vonage**: 16kHz Linear PCM, Binary frames
|
||||
|
||||
## How It Works
|
||||
|
||||
Dograh AI automatically:
|
||||
1. Constructs webhook URLs based on your deployment
|
||||
2. Passes them to the telephony provider when initiating calls
|
||||
3. Verifies webhook signatures for security
|
||||
3. Verifies webhook signatures for security:
|
||||
- **Twilio**: HMAC-SHA1 signature validation
|
||||
- **Vonage**: JWT token verification
|
||||
4. Processes status updates to track call lifecycle
|
||||
5. Manages WebSocket connections for audio streaming
|
||||
6. Handles provider-specific audio formats and protocols
|
||||
|
||||
## Local Development
|
||||
|
||||
|
|
|
|||
2
pipecat
2
pipecat
|
|
@ -1 +1 @@
|
|||
Subproject commit f88c8a00de00beb93429c86d6353dc2673b6eb77
|
||||
Subproject commit 278248a40cf7a8cb11d32534016ffec099408f8c
|
||||
|
|
@ -28,8 +28,15 @@ import { useAuth } from "@/lib/auth";
|
|||
// TODO: Make UI provider-agnostic
|
||||
interface TelephonyConfigForm {
|
||||
provider: string;
|
||||
account_sid: string;
|
||||
auth_token: string;
|
||||
// Twilio fields
|
||||
account_sid?: string;
|
||||
auth_token?: string;
|
||||
// Vonage fields
|
||||
application_id?: string;
|
||||
private_key?: string;
|
||||
api_key?: string;
|
||||
api_secret?: string;
|
||||
// Common field
|
||||
from_number: string;
|
||||
}
|
||||
|
||||
|
|
@ -71,13 +78,26 @@ export default function ConfigureTelephonyPage() {
|
|||
headers: { Authorization: `Bearer ${accessToken}` },
|
||||
});
|
||||
|
||||
if (!response.error && response.data?.twilio) {
|
||||
setHasExistingConfig(true);
|
||||
// Masked values like "****************def0" from backend
|
||||
setValue("account_sid", response.data.twilio.account_sid);
|
||||
setValue("auth_token", response.data.twilio.auth_token);
|
||||
if (response.data.twilio.from_numbers?.length > 0) {
|
||||
setValue("from_number", response.data.twilio.from_numbers[0]);
|
||||
if (!response.error) {
|
||||
// Simple single provider config
|
||||
if (response.data?.twilio) {
|
||||
setHasExistingConfig(true);
|
||||
setValue("provider", "twilio");
|
||||
setValue("account_sid", response.data.twilio.account_sid);
|
||||
setValue("auth_token", response.data.twilio.auth_token);
|
||||
if (response.data.twilio.from_numbers?.length > 0) {
|
||||
setValue("from_number", response.data.twilio.from_numbers[0]);
|
||||
}
|
||||
} else if (response.data?.vonage) {
|
||||
setHasExistingConfig(true);
|
||||
setValue("provider", "vonage");
|
||||
setValue("application_id", response.data.vonage.application_id);
|
||||
setValue("private_key", response.data.vonage.private_key);
|
||||
setValue("api_key", response.data.vonage.api_key || "");
|
||||
setValue("api_secret", response.data.vonage.api_secret || "");
|
||||
if (response.data.vonage.from_numbers?.length > 0) {
|
||||
setValue("from_number", response.data.vonage.from_numbers[0]);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
|
|
@ -93,14 +113,26 @@ export default function ConfigureTelephonyPage() {
|
|||
|
||||
try {
|
||||
const accessToken = await getAccessToken();
|
||||
|
||||
// Build the request body based on provider
|
||||
let requestBody: any = {
|
||||
provider: data.provider,
|
||||
from_numbers: [data.from_number],
|
||||
};
|
||||
|
||||
if (data.provider === "twilio") {
|
||||
requestBody.account_sid = data.account_sid;
|
||||
requestBody.auth_token = data.auth_token;
|
||||
} else if (data.provider === "vonage") {
|
||||
requestBody.application_id = data.application_id;
|
||||
requestBody.private_key = data.private_key;
|
||||
requestBody.api_key = data.api_key;
|
||||
requestBody.api_secret = data.api_secret;
|
||||
}
|
||||
|
||||
const response = await saveTelephonyConfigurationApiV1OrganizationsTelephonyConfigPost({
|
||||
headers: { Authorization: `Bearer ${accessToken}` },
|
||||
body: {
|
||||
provider: data.provider,
|
||||
account_sid: data.account_sid,
|
||||
auth_token: data.auth_token,
|
||||
from_numbers: [data.from_number],
|
||||
},
|
||||
body: requestBody,
|
||||
});
|
||||
|
||||
if (response.error) {
|
||||
|
|
@ -178,8 +210,14 @@ export default function ConfigureTelephonyPage() {
|
|||
</SelectTrigger>
|
||||
<SelectContent>
|
||||
<SelectItem value="twilio">Twilio</SelectItem>
|
||||
<SelectItem value="vonage">Vonage</SelectItem>
|
||||
</SelectContent>
|
||||
</Select>
|
||||
{hasExistingConfig && (
|
||||
<p className="text-sm text-amber-600">
|
||||
⚠️ Switching providers will require entering new credentials
|
||||
</p>
|
||||
)}
|
||||
</div>
|
||||
|
||||
{/* Twilio-specific fields */}
|
||||
|
|
@ -250,6 +288,87 @@ export default function ConfigureTelephonyPage() {
|
|||
</>
|
||||
)}
|
||||
|
||||
{/* Vonage-specific fields */}
|
||||
{selectedProvider === "vonage" && (
|
||||
<>
|
||||
<div className="space-y-2">
|
||||
<Label htmlFor="application_id">Application ID</Label>
|
||||
<Input
|
||||
id="application_id"
|
||||
placeholder="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
|
||||
{...register("application_id", {
|
||||
required: selectedProvider === "vonage" ? "Application ID is required" : false,
|
||||
})}
|
||||
/>
|
||||
{errors.application_id && (
|
||||
<p className="text-sm text-red-500">
|
||||
{errors.application_id.message}
|
||||
</p>
|
||||
)}
|
||||
</div>
|
||||
|
||||
<div className="space-y-2">
|
||||
<Label htmlFor="private_key">Private Key</Label>
|
||||
<textarea
|
||||
id="private_key"
|
||||
className="w-full min-h-[100px] px-3 py-2 text-sm border rounded-md"
|
||||
placeholder="-----BEGIN PRIVATE KEY----- ... -----END PRIVATE KEY-----"
|
||||
{...register("private_key", {
|
||||
required: selectedProvider === "vonage" && !hasExistingConfig
|
||||
? "Private key is required"
|
||||
: false,
|
||||
})}
|
||||
/>
|
||||
{errors.private_key && (
|
||||
<p className="text-sm text-red-500">
|
||||
{errors.private_key.message}
|
||||
</p>
|
||||
)}
|
||||
</div>
|
||||
|
||||
<div className="space-y-2">
|
||||
<Label htmlFor="api_key">API Key (Optional)</Label>
|
||||
<Input
|
||||
id="api_key"
|
||||
placeholder="Optional - for some operations"
|
||||
{...register("api_key")}
|
||||
/>
|
||||
</div>
|
||||
|
||||
<div className="space-y-2">
|
||||
<Label htmlFor="api_secret">API Secret (Optional)</Label>
|
||||
<Input
|
||||
id="api_secret"
|
||||
type="password"
|
||||
placeholder="Optional - for webhook verification"
|
||||
{...register("api_secret")}
|
||||
/>
|
||||
</div>
|
||||
|
||||
<div className="space-y-2">
|
||||
<Label htmlFor="from_number">From Phone Number</Label>
|
||||
<Input
|
||||
id="from_number"
|
||||
autoComplete="tel"
|
||||
placeholder="14155551234 (no + prefix for Vonage)"
|
||||
{...register("from_number", {
|
||||
required: "Phone number is required",
|
||||
pattern: {
|
||||
value: /^[1-9]\d{1,14}$/,
|
||||
message:
|
||||
"Enter a valid phone number without + prefix (e.g., 14155551234)",
|
||||
},
|
||||
})}
|
||||
/>
|
||||
{errors.from_number && (
|
||||
<p className="text-sm text-red-500">
|
||||
{errors.from_number.message}
|
||||
</p>
|
||||
)}
|
||||
</div>
|
||||
</>
|
||||
)}
|
||||
|
||||
<div className="pt-4">
|
||||
<Button
|
||||
type="submit"
|
||||
|
|
|
|||
|
|
@ -117,8 +117,8 @@ const WorkflowHeader = ({ isDirty, workflowName, rfInstance, onRun, workflowId,
|
|||
});
|
||||
|
||||
// If no configuration exists, show configure dialog
|
||||
// Check if Twilio is configured (currently the only supported provider)
|
||||
if (configResponse.error || !configResponse.data?.twilio) {
|
||||
// Check if any telephony provider is configured (Twilio or Vonage)
|
||||
if (configResponse.error || (!configResponse.data?.twilio && !configResponse.data?.vonage)) {
|
||||
setConfigureDialogOpen(true);
|
||||
return;
|
||||
}
|
||||
|
|
@ -153,7 +153,10 @@ const WorkflowHeader = ({ isDirty, workflowName, rfInstance, onRun, workflowId,
|
|||
|
||||
// Configuration exists, proceed with call initiation
|
||||
const response = await initiateCallApiV1TelephonyInitiateCallPost({
|
||||
body: { workflow_id: workflowId },
|
||||
body: {
|
||||
workflow_id: workflowId,
|
||||
phone_number: phoneNumber
|
||||
},
|
||||
headers: { 'Authorization': `Bearer ${accessToken}` },
|
||||
});
|
||||
|
||||
|
|
|
|||
|
|
@ -1,9 +1,8 @@
|
|||
// This file is auto-generated by @hey-api/openapi-ts
|
||||
|
||||
import { type ClientOptions as DefaultClientOptions, type Config, createClient, createConfig } from '@hey-api/client-fetch';
|
||||
|
||||
import { createClientConfig } from '../lib/apiClient';
|
||||
import type { ClientOptions } from './types.gen';
|
||||
import { type Config, type ClientOptions as DefaultClientOptions, createClient, createConfig } from '@hey-api/client-fetch';
|
||||
import { createClientConfig } from '../lib/apiClient';
|
||||
|
||||
/**
|
||||
* The `createClientConfig()` function will be called on client initialization
|
||||
|
|
@ -17,4 +16,4 @@ export type CreateClientConfig<T extends DefaultClientOptions = ClientOptions> =
|
|||
|
||||
export const client = createClient(createClientConfig(createConfig<ClientOptions>({
|
||||
baseUrl: 'http://127.0.0.1:8000'
|
||||
})));
|
||||
})));
|
||||
|
|
@ -1,3 +1,3 @@
|
|||
// This file is auto-generated by @hey-api/openapi-ts
|
||||
export * from './sdk.gen';
|
||||
export * from './types.gen';
|
||||
export * from './sdk.gen';
|
||||
File diff suppressed because one or more lines are too long
|
|
@ -261,11 +261,6 @@ export type ImpersonateResponse = {
|
|||
access_token: string;
|
||||
};
|
||||
|
||||
export type InitiateCallRequest = {
|
||||
workflow_id: number;
|
||||
workflow_run_id?: number | null;
|
||||
};
|
||||
|
||||
export type IntegrationResponse = {
|
||||
id: number;
|
||||
integration_id: string;
|
||||
|
|
@ -390,6 +385,7 @@ export type SuperuserWorkflowRunsListResponse = {
|
|||
*/
|
||||
export type TelephonyConfigurationResponse = {
|
||||
twilio?: TwilioConfigurationResponse | null;
|
||||
vonage?: VonageConfigurationResponse | null;
|
||||
};
|
||||
|
||||
export type TestSessionResponse = {
|
||||
|
|
@ -502,6 +498,45 @@ export type ValidationError = {
|
|||
type: string;
|
||||
};
|
||||
|
||||
/**
|
||||
* Request schema for Vonage configuration.
|
||||
*/
|
||||
export type VonageConfigurationRequest = {
|
||||
provider?: string;
|
||||
/**
|
||||
* Vonage API Key
|
||||
*/
|
||||
api_key?: string | null;
|
||||
/**
|
||||
* Vonage API Secret
|
||||
*/
|
||||
api_secret?: string | null;
|
||||
/**
|
||||
* Vonage Application ID
|
||||
*/
|
||||
application_id: string;
|
||||
/**
|
||||
* Private key for JWT generation
|
||||
*/
|
||||
private_key: string;
|
||||
/**
|
||||
* List of Vonage phone numbers (without + prefix)
|
||||
*/
|
||||
from_numbers: Array<string>;
|
||||
};
|
||||
|
||||
/**
|
||||
* Response schema for Vonage configuration with masked sensitive fields.
|
||||
*/
|
||||
export type VonageConfigurationResponse = {
|
||||
provider: string;
|
||||
application_id: string;
|
||||
api_key: string | null;
|
||||
api_secret: string | null;
|
||||
private_key: string;
|
||||
from_numbers: Array<string>;
|
||||
};
|
||||
|
||||
export type WorkflowError = {
|
||||
kind: ItemKind;
|
||||
id: string | null;
|
||||
|
|
@ -621,8 +656,19 @@ export type WorkflowTemplateResponse = {
|
|||
created_at: string;
|
||||
};
|
||||
|
||||
export type ApiRoutesTelephonyInitiateCallRequest = {
|
||||
workflow_id: number;
|
||||
workflow_run_id?: number | null;
|
||||
phone_number?: string | null;
|
||||
};
|
||||
|
||||
export type ApiRoutesTwilioInitiateCallRequest = {
|
||||
workflow_id: number;
|
||||
workflow_run_id?: number | null;
|
||||
};
|
||||
|
||||
export type InitiateCallApiV1TelephonyInitiateCallPostData = {
|
||||
body: InitiateCallRequest;
|
||||
body: ApiRoutesTelephonyInitiateCallRequest;
|
||||
headers?: {
|
||||
authorization?: string | null;
|
||||
};
|
||||
|
|
@ -683,8 +729,37 @@ export type HandleStatusCallbackApiV1TelephonyStatusCallbackWorkflowRunIdPostRes
|
|||
200: unknown;
|
||||
};
|
||||
|
||||
export type HandleVonageEventsApiV1TelephonyEventsWorkflowRunIdPostData = {
|
||||
body?: never;
|
||||
path: {
|
||||
workflow_run_id: number;
|
||||
};
|
||||
query?: never;
|
||||
url: '/api/v1/telephony/events/{workflow_run_id}';
|
||||
};
|
||||
|
||||
export type HandleVonageEventsApiV1TelephonyEventsWorkflowRunIdPostErrors = {
|
||||
/**
|
||||
* Not found
|
||||
*/
|
||||
404: unknown;
|
||||
/**
|
||||
* Validation Error
|
||||
*/
|
||||
422: HttpValidationError;
|
||||
};
|
||||
|
||||
export type HandleVonageEventsApiV1TelephonyEventsWorkflowRunIdPostError = HandleVonageEventsApiV1TelephonyEventsWorkflowRunIdPostErrors[keyof HandleVonageEventsApiV1TelephonyEventsWorkflowRunIdPostErrors];
|
||||
|
||||
export type HandleVonageEventsApiV1TelephonyEventsWorkflowRunIdPostResponses = {
|
||||
/**
|
||||
* Successful Response
|
||||
*/
|
||||
200: unknown;
|
||||
};
|
||||
|
||||
export type InitiateCallApiV1TwilioInitiateCallPostData = {
|
||||
body: InitiateCallRequest;
|
||||
body: ApiRoutesTwilioInitiateCallRequest;
|
||||
headers?: {
|
||||
authorization?: string | null;
|
||||
};
|
||||
|
|
@ -2019,7 +2094,9 @@ export type GetTelephonyConfigurationApiV1OrganizationsTelephonyConfigGetData =
|
|||
authorization?: string | null;
|
||||
};
|
||||
path?: never;
|
||||
query?: never;
|
||||
query?: {
|
||||
provider?: string | null;
|
||||
};
|
||||
url: '/api/v1/organizations/telephony-config';
|
||||
};
|
||||
|
||||
|
|
@ -2046,7 +2123,7 @@ export type GetTelephonyConfigurationApiV1OrganizationsTelephonyConfigGetRespons
|
|||
export type GetTelephonyConfigurationApiV1OrganizationsTelephonyConfigGetResponse = GetTelephonyConfigurationApiV1OrganizationsTelephonyConfigGetResponses[keyof GetTelephonyConfigurationApiV1OrganizationsTelephonyConfigGetResponses];
|
||||
|
||||
export type SaveTelephonyConfigurationApiV1OrganizationsTelephonyConfigPostData = {
|
||||
body: TwilioConfigurationRequest;
|
||||
body: TwilioConfigurationRequest | VonageConfigurationRequest;
|
||||
headers?: {
|
||||
authorization?: string | null;
|
||||
};
|
||||
|
|
@ -2871,4 +2948,4 @@ export type HealthApiV1HealthGetResponses = {
|
|||
|
||||
export type ClientOptions = {
|
||||
baseUrl: 'http://127.0.0.1:8000' | (string & {});
|
||||
};
|
||||
};
|
||||
Loading…
Add table
Add a link
Reference in a new issue