diff --git a/api/.env.example b/api/.env.example
index 8803380..1496a87 100644
--- a/api/.env.example
+++ b/api/.env.example
@@ -33,10 +33,8 @@ STACK_AUTH_PROJECT_ID="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
STACK_SECRET_SERVER_KEY="ssk_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
STACK_PUBLISHABLE_CLIENT_KEY="pck_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
-# Twilio Configuration
-TWILIO_ACCOUNT_SID="SKxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
-TWILIO_AUTH_TOKEN="xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
-TWILIO_FROM_NUMBER="+1234567890"
+# Telephony Configuration
+# Telephony providers are configured via UI/database only. Navigate to: Settings → Integrations → Telephony
# Tracing and Analytics
ENABLE_TRACING=true
diff --git a/api/Dockerfile b/api/Dockerfile
index 1a69ea8..301bd89 100644
--- a/api/Dockerfile
+++ b/api/Dockerfile
@@ -21,10 +21,11 @@ RUN pip install --user --no-cache-dir -r requirements.txt && \
# Force reinstall of pipecat on every build (cache bust)
ARG CACHEBUST=1
-RUN pip install --user 'git+https://github.com/dograh-hq/pipecat.git@f88c8a0#egg=pipecat-ai[cartesia,deepgram,openai,elevenlabs,groq,google,azure,soundfile,silero,webrtc]' && \
+RUN pip install --user 'git+https://github.com/dograh-hq/pipecat.git@278248a#egg=pipecat-ai[cartesia,deepgram,openai,elevenlabs,groq,google,azure,soundfile,silero,webrtc]' && \
# Clean up pip cache after pipecat installation
rm -rf /root/.cache/pip
+
# Remove unnecessary Python cache files from installed packages
RUN find /root/.local -type f -name '*.pyc' -delete && \
find /root/.local -type d -name '__pycache__' -delete && \
diff --git a/api/alembic/versions/a57d25b75117_add_provider_info_to_cost_info.py b/api/alembic/versions/a57d25b75117_add_provider_info_to_cost_info.py
new file mode 100644
index 0000000..a3895b3
--- /dev/null
+++ b/api/alembic/versions/a57d25b75117_add_provider_info_to_cost_info.py
@@ -0,0 +1,122 @@
+"""add_provider_info_to_cost_info
+
+Revision ID: a57d25b75117
+Revises: 982ec8e434be
+Create Date: 2025-10-21 12:28:06.053318
+
+"""
+from typing import Sequence, Union
+
+from alembic import op
+import sqlalchemy as sa
+from alembic_postgresql_enum import TableReference
+
+
+# revision identifiers, used by Alembic.
+revision: str = 'a57d25b75117'
+down_revision: Union[str, None] = '982ec8e434be'
+branch_labels: Union[str, Sequence[str], None] = None
+depends_on: Union[str, Sequence[str], None] = None
+
+
+def upgrade() -> None:
+ """
+ Add provider info to existing cost_info JSON for backward compatibility.
+ This migration:
+ 1. Adds 'vonage' to workflow_run_mode enum
+ 2. Adds 'provider' field to cost_info for existing records
+ 3. Migrates TWILIO_CONFIGURATION key to TELEPHONY_CONFIGURATION
+ """
+
+ # Add 'vonage' to the workflow_run_mode enum using sync_enum_values like other migrations
+ op.sync_enum_values(
+ enum_schema="public",
+ enum_name="workflow_run_mode",
+ new_values=["twilio", "stasis", "webrtc", "smallwebrtc", "VOICE", "CHAT", "vonage"],
+ affected_columns=[
+ TableReference(
+ table_schema="public", table_name="workflow_runs", column_name="mode"
+ )
+ ],
+ enum_values_to_rename=[],
+ )
+
+ # Update workflow_runs to add provider info based on mode
+ # Use jsonb_set() to add provider field while preserving existing data
+ op.execute("""
+ UPDATE workflow_runs
+ SET cost_info = jsonb_set(
+ CASE
+ WHEN cost_info IS NULL OR cost_info::text = '{}'
+ THEN '{}'::jsonb
+ ELSE cost_info::jsonb
+ END,
+ '{provider}',
+ '"twilio"'::jsonb,
+ true
+ )::json
+ WHERE mode = 'twilio'
+ AND (cost_info IS NULL OR cost_info::text NOT LIKE '%provider%')
+ """)
+
+ op.execute("""
+ UPDATE workflow_runs
+ SET cost_info = jsonb_set(
+ CASE
+ WHEN cost_info IS NULL OR cost_info::text = '{}'
+ THEN '{}'::jsonb
+ ELSE cost_info::jsonb
+ END,
+ '{provider}',
+ '"vonage"'::jsonb,
+ true
+ )::json
+ WHERE mode = 'vonage'
+ AND (cost_info IS NULL OR cost_info::text NOT LIKE '%provider%')
+ """)
+
+ # Simply rename the key from TWILIO_CONFIGURATION to TELEPHONY_CONFIGURATION
+ # Keep the same single-provider format
+ op.execute("""
+ UPDATE organization_configurations
+ SET key = 'TELEPHONY_CONFIGURATION'
+ WHERE key = 'TWILIO_CONFIGURATION';
+ """)
+
+ print("Migration complete: Added vonage to enum, provider info to cost_info, and renamed configuration key")
+
+
+def downgrade() -> None:
+ """
+ Remove provider info and revert key name.
+ Revert enum to previous state (removing 'vonage').
+ """
+
+ # Remove provider field from cost_info while preserving other data
+ op.execute("""
+ UPDATE workflow_runs
+ SET cost_info = (cost_info::jsonb - 'provider')::json
+ WHERE cost_info::text LIKE '%provider%'
+ """)
+
+ # Revert key name
+ op.execute("""
+ UPDATE organization_configurations
+ SET key = 'TWILIO_CONFIGURATION'
+ WHERE key = 'TELEPHONY_CONFIGURATION';
+ """)
+
+ # Revert enum to previous state
+ op.sync_enum_values(
+ enum_schema="public",
+ enum_name="workflow_run_mode",
+ new_values=["twilio", "stasis", "webrtc", "smallwebrtc", "VOICE", "CHAT"],
+ affected_columns=[
+ TableReference(
+ table_schema="public", table_name="workflow_runs", column_name="mode"
+ )
+ ],
+ enum_values_to_rename=[],
+ )
+
+ print("Downgrade complete: Removed provider info and reverted key name")
\ No newline at end of file
diff --git a/api/constants.py b/api/constants.py
index 235c1e0..c87e19f 100644
--- a/api/constants.py
+++ b/api/constants.py
@@ -16,9 +16,6 @@ ENABLE_TRACING = os.getenv("ENABLE_TRACING", "false").lower() == "true"
ENABLE_RNNOISE = os.getenv("ENABLE_RNNOISE", "false").lower() == "true"
BACKEND_API_ENDPOINT = os.getenv("BACKEND_API_ENDPOINT", None)
-TWILIO_ACCOUNT_SID = os.getenv("TWILIO_ACCOUNT_SID", None)
-TWILIO_AUTH_TOKEN = os.getenv("TWILIO_AUTH_TOKEN", None)
-TWILIO_DEFAULT_FROM_NUMBER = os.getenv("TWILIO_FROM_NUMBER", None)
DATABASE_URL = os.environ["DATABASE_URL"]
REDIS_URL = os.environ["REDIS_URL"]
diff --git a/api/enums.py b/api/enums.py
index 7e7e9c6..7175e78 100644
--- a/api/enums.py
+++ b/api/enums.py
@@ -14,6 +14,7 @@ class Environment(Enum):
class WorkflowRunMode(Enum):
TWILIO = "twilio"
+ VONAGE = "vonage"
STASIS = "stasis"
WEBRTC = "webrtc"
SMALLWEBRTC = "smallwebrtc"
@@ -62,7 +63,8 @@ class OrganizationConfigurationKey(Enum):
DISPOSITION_CODE_MAPPING = "DISPOSITION_CODE_MAPPING"
DISPOSITION_MESSAGE_TEMPLATE = "DISPOSITION_MESSAGE_TEMPLATE"
CONCURRENT_CALL_LIMIT = "CONCURRENT_CALL_LIMIT"
- TWILIO_CONFIGURATION = "TWILIO_CONFIGURATION"
+ TELEPHONY_CONFIGURATION = "TELEPHONY_CONFIGURATION" # Stores all providers + active one
+ TWILIO_CONFIGURATION = "TWILIO_CONFIGURATION" # Deprecated - for backward compatibility
class WorkflowStatus(Enum):
diff --git a/api/routes/campaign.py b/api/routes/campaign.py
index ec40757..f8941b0 100644
--- a/api/routes/campaign.py
+++ b/api/routes/campaign.py
@@ -170,10 +170,10 @@ async def start_campaign(
user: UserModel = Depends(get_user),
) -> CampaignResponse:
"""Start campaign execution"""
- # Check if organization has TWILIO_CONFIGURATION configured
+ # Check if organization has TELEPHONY_CONFIGURATION configured
twilio_config = await db_client.get_configuration(
user.selected_organization_id,
- OrganizationConfigurationKey.TWILIO_CONFIGURATION.value,
+ OrganizationConfigurationKey.TELEPHONY_CONFIGURATION.value,
)
if not twilio_config or not twilio_config.value:
@@ -278,10 +278,10 @@ async def resume_campaign(
user: UserModel = Depends(get_user),
) -> CampaignResponse:
"""Resume a paused campaign"""
- # Check if organization has TWILIO_CONFIGURATION configured
+ # Check if organization has TELEPHONY_CONFIGURATION configured
twilio_config = await db_client.get_configuration(
user.selected_organization_id,
- OrganizationConfigurationKey.TWILIO_CONFIGURATION.value,
+ OrganizationConfigurationKey.TELEPHONY_CONFIGURATION.value,
)
if not twilio_config or not twilio_config.value:
diff --git a/api/routes/main.py b/api/routes/main.py
index 9c6b647..f418512 100644
--- a/api/routes/main.py
+++ b/api/routes/main.py
@@ -11,7 +11,8 @@ from api.routes.rtc_offer import router as rtc_offer_router
from api.routes.s3_signed_url import router as s3_router
from api.routes.service_keys import router as service_keys_router
from api.routes.superuser import router as superuser_router
-from api.routes.twilio import router as twilio_router
+from api.routes.telephony import router as telephony_router
+from api.routes.twilio import router as twilio_router # TODO: Remove after migrating workflow_run_cost.py
from api.routes.user import router as user_router
from api.routes.webrtc_signaling import router as webrtc_signaling_router
from api.routes.workflow import router as workflow_router
@@ -21,7 +22,8 @@ router = APIRouter(
responses={404: {"description": "Not found"}},
)
-router.include_router(twilio_router)
+router.include_router(telephony_router) # New generic telephony routes
+router.include_router(twilio_router) # TODO: Remove after migrating workflow_run_cost.py
router.include_router(rtc_offer_router)
router.include_router(superuser_router)
router.include_router(workflow_router)
diff --git a/api/routes/organization.py b/api/routes/organization.py
index 8b30860..e0df9c1 100644
--- a/api/routes/organization.py
+++ b/api/routes/organization.py
@@ -1,12 +1,16 @@
from fastapi import APIRouter, Depends, HTTPException
+from loguru import logger
from api.db import db_client
from api.db.models import UserModel
from api.enums import OrganizationConfigurationKey
+from typing import Optional, Union
from api.schemas.telephony_config import (
TelephonyConfigurationResponse,
TwilioConfigurationRequest,
TwilioConfigurationResponse,
+ VonageConfigurationRequest,
+ VonageConfigurationResponse,
)
from api.services.auth.depends import get_user
from api.services.configuration.masking import is_mask_of, mask_key
@@ -14,37 +18,85 @@ from api.services.configuration.masking import is_mask_of, mask_key
router = APIRouter(prefix="/organizations", tags=["organizations"])
+# TODO: Make endpoints provider-agnostic
@router.get("/telephony-config", response_model=TelephonyConfigurationResponse)
-async def get_telephony_configuration(user: UserModel = Depends(get_user)):
- """Get telephony configuration for the user's organization with masked sensitive fields."""
+async def get_telephony_configuration(
+ user: UserModel = Depends(get_user),
+ provider: Optional[str] = None # Query param to filter by provider
+):
+ """Get telephony configuration for the user's organization with masked sensitive fields.
+
+ Args:
+ provider: Optional provider filter ('twilio' or 'vonage').
+ If specified, only returns config if it matches the stored provider.
+ """
if not user.selected_organization_id:
raise HTTPException(status_code=400, detail="No organization selected")
+ # Try new key first, fallback to old for backward compatibility
config = await db_client.get_configuration(
user.selected_organization_id,
- OrganizationConfigurationKey.TWILIO_CONFIGURATION.value,
+ OrganizationConfigurationKey.TELEPHONY_CONFIGURATION.value,
)
+
+ # TODO: Remove after telephony provider db migration is complete
+ if not config:
+ config = await db_client.get_configuration(
+ user.selected_organization_id,
+ OrganizationConfigurationKey.TWILIO_CONFIGURATION.value,
+ )
if not config or not config.value:
- return TelephonyConfigurationResponse(twilio=None)
+ return TelephonyConfigurationResponse(twilio=None, vonage=None)
- # Mask sensitive fields (account_sid and auth_token) before returning
- account_sid = config.value.get("account_sid", "")
- auth_token = config.value.get("auth_token", "")
+ # Simple single-provider format
+ stored_provider = config.value.get("provider", "twilio")
+
+ # If a specific provider is requested, only return config if it matches
+ if provider and provider != stored_provider:
+ # User is requesting a different provider than what's stored
+ return TelephonyConfigurationResponse(twilio=None, vonage=None)
+
+ if stored_provider == "twilio":
+ # Mask sensitive fields (account_sid and auth_token) before returning
+ account_sid = config.value.get("account_sid", "")
+ auth_token = config.value.get("auth_token", "")
- return TelephonyConfigurationResponse(
- twilio=TwilioConfigurationResponse(
- provider="twilio",
- account_sid=mask_key(account_sid) if account_sid else "",
- auth_token=mask_key(auth_token) if auth_token else "",
- from_numbers=config.value.get("from_numbers", []),
+ return TelephonyConfigurationResponse(
+ twilio=TwilioConfigurationResponse(
+ provider="twilio",
+ account_sid=mask_key(account_sid) if account_sid else "",
+ auth_token=mask_key(auth_token) if auth_token else "",
+ from_numbers=config.value.get("from_numbers", []),
+ ),
+ vonage=None
)
- )
+ elif stored_provider == "vonage":
+ # Mask sensitive fields for Vonage
+ application_id = config.value.get("application_id", "")
+ private_key = config.value.get("private_key", "")
+ api_key = config.value.get("api_key", "")
+ api_secret = config.value.get("api_secret", "")
+
+ return TelephonyConfigurationResponse(
+ twilio=None,
+ vonage=VonageConfigurationResponse(
+ provider="vonage",
+ application_id=application_id, # Not masked, not sensitive
+ private_key=mask_key(private_key) if private_key else "",
+ api_key=mask_key(api_key) if api_key else None,
+ api_secret=mask_key(api_secret) if api_secret else None,
+ from_numbers=config.value.get("from_numbers", []),
+ )
+ )
+ else:
+ return TelephonyConfigurationResponse(twilio=None, vonage=None)
@router.post("/telephony-config")
async def save_telephony_configuration(
- request: TwilioConfigurationRequest, user: UserModel = Depends(get_user)
+ request: Union[TwilioConfigurationRequest, VonageConfigurationRequest],
+ user: UserModel = Depends(get_user)
):
"""Save telephony configuration for the user's organization."""
if not user.selected_organization_id:
@@ -53,33 +105,73 @@ async def save_telephony_configuration(
# Fetch existing configuration to handle masked values
existing_config = await db_client.get_configuration(
user.selected_organization_id,
- OrganizationConfigurationKey.TWILIO_CONFIGURATION.value,
+ OrganizationConfigurationKey.TELEPHONY_CONFIGURATION.value,
)
+ if not existing_config:
+ # Check old key for backward compatibility
+ existing_config = await db_client.get_configuration(
+ user.selected_organization_id,
+ OrganizationConfigurationKey.TWILIO_CONFIGURATION.value,
+ )
- # Build new configuration
- config_value = {
- "provider": request.provider,
- "account_sid": request.account_sid,
- "auth_token": request.auth_token,
- "from_numbers": request.from_numbers,
- }
+ # Build simple single-provider configuration
+ if request.provider == "twilio":
+ config_value = {
+ "provider": "twilio",
+ "account_sid": request.account_sid,
+ "auth_token": request.auth_token,
+ "from_numbers": request.from_numbers,
+ }
+ elif request.provider == "vonage":
+ config_value = {
+ "provider": "vonage",
+ "application_id": request.application_id,
+ "private_key": request.private_key,
+ "api_key": getattr(request, 'api_key', None),
+ "api_secret": getattr(request, 'api_secret', None),
+ "from_numbers": request.from_numbers,
+ }
+ else:
+ raise HTTPException(status_code=400, detail=f"Unsupported provider: {request.provider}")
- # If incoming values are masked (same as stored masked value), keep the original
+ # Handle masked values - only if same provider
if existing_config and existing_config.value:
- # Check if account_sid is unchanged (masked value matches)
- stored_account_sid = existing_config.value.get("account_sid", "")
- if stored_account_sid and is_mask_of(request.account_sid, stored_account_sid):
- config_value["account_sid"] = stored_account_sid # Keep original
-
- # Check if auth_token is unchanged (masked value matches)
- stored_auth_token = existing_config.value.get("auth_token", "")
- if stored_auth_token and is_mask_of(request.auth_token, stored_auth_token):
- config_value["auth_token"] = stored_auth_token # Keep original
+ existing_provider = existing_config.value.get("provider")
+
+ # Only preserve masked values if it's the same provider
+ if existing_provider == request.provider:
+ if request.provider == "twilio":
+ # Check if account_sid is unchanged (masked value matches)
+ if hasattr(request, 'account_sid') and is_mask_of(request.account_sid, existing_config.value.get("account_sid", "")):
+ config_value["account_sid"] = existing_config.value["account_sid"] # Keep original
+
+ # Check if auth_token is unchanged (masked value matches)
+ if hasattr(request, 'auth_token') and is_mask_of(request.auth_token, existing_config.value.get("auth_token", "")):
+ config_value["auth_token"] = existing_config.value["auth_token"] # Keep original
+
+ elif request.provider == "vonage":
+ # Check if private_key is unchanged (masked value matches)
+ if hasattr(request, 'private_key') and is_mask_of(request.private_key, existing_config.value.get("private_key", "")):
+ config_value["private_key"] = existing_config.value["private_key"] # Keep original
+
+ # Check if api_key is unchanged (masked value matches)
+ if hasattr(request, 'api_key') and request.api_key and is_mask_of(request.api_key, existing_config.value.get("api_key", "")):
+ config_value["api_key"] = existing_config.value["api_key"] # Keep original
+
+ # Check if api_secret is unchanged (masked value matches)
+ if hasattr(request, 'api_secret') and request.api_secret and is_mask_of(request.api_secret, existing_config.value.get("api_secret", "")):
+ config_value["api_secret"] = existing_config.value["api_secret"] # Keep original
+ # Always save to new TELEPHONY_CONFIGURATION key
await db_client.upsert_configuration(
user.selected_organization_id,
- OrganizationConfigurationKey.TWILIO_CONFIGURATION.value,
+ OrganizationConfigurationKey.TELEPHONY_CONFIGURATION.value,
config_value,
)
+
+ # If old TWILIO_CONFIGURATION exists, delete it to avoid confusion
+ if existing_config and existing_config.key == OrganizationConfigurationKey.TWILIO_CONFIGURATION.value:
+ # Note: We're migrating from old to new key
+ logger.info(f"Migrated telephony config from TWILIO_CONFIGURATION to TELEPHONY_CONFIGURATION for org {user.selected_organization_id}")
return {"message": "Telephony configuration saved successfully"}
diff --git a/api/routes/telephony.py b/api/routes/telephony.py
new file mode 100644
index 0000000..6b2d833
--- /dev/null
+++ b/api/routes/telephony.py
@@ -0,0 +1,507 @@
+"""
+Generic telephony routes that work with any telephony provider.
+"""
+import json
+import random
+from datetime import UTC, datetime
+from typing import Annotated, Optional
+
+from fastapi import APIRouter, Depends, Form, Header, HTTPException, Request, WebSocket
+from loguru import logger
+from pydantic import BaseModel
+from starlette.responses import HTMLResponse
+
+from api.db import db_client
+from api.db.models import UserModel
+from api.enums import WorkflowRunMode
+from api.services.auth.depends import get_user
+from api.services.campaign.call_dispatcher import campaign_call_dispatcher
+from api.services.campaign.campaign_event_publisher import get_campaign_event_publisher
+from api.services.pipecat.run_pipeline import run_pipeline_twilio, run_pipeline_vonage
+from api.services.telephony.factory import get_telephony_provider
+from api.utils.tunnel import TunnelURLProvider
+from pipecat.utils.context import set_current_run_id
+
+router = APIRouter(prefix="/telephony")
+
+
+class InitiateCallRequest(BaseModel):
+ workflow_id: int
+ workflow_run_id: int | None = None
+ phone_number: str | None = None # Optional phone number to call
+
+
+class StatusCallbackRequest(BaseModel):
+ """Generic status callback that can handle different providers"""
+ # Common fields
+ call_id: str
+ status: str
+ from_number: Optional[str] = None
+ to_number: Optional[str] = None
+ direction: Optional[str] = None
+ duration: Optional[str] = None
+
+ # Provider-specific fields stored as extra
+ extra: dict = {}
+
+ @classmethod
+ def from_twilio(cls, data: dict):
+ """Convert Twilio callback to generic format"""
+ return cls(
+ call_id=data.get("CallSid", ""),
+ status=data.get("CallStatus", ""),
+ from_number=data.get("From"),
+ to_number=data.get("To"),
+ direction=data.get("Direction"),
+ duration=data.get("CallDuration") or data.get("Duration"),
+ extra=data
+ )
+
+ @classmethod
+ def from_vonage(cls, data: dict):
+ """Convert Vonage event to generic format"""
+ # Map Vonage status to common format
+ status_map = {
+ "started": "initiated",
+ "ringing": "ringing",
+ "answered": "answered",
+ "complete": "completed",
+ "failed": "failed",
+ "busy": "busy",
+ "timeout": "no-answer",
+ "rejected": "busy"
+ }
+
+ return cls(
+ call_id=data.get("uuid", ""),
+ status=status_map.get(data.get("status", ""), data.get("status", "")),
+ from_number=data.get("from"),
+ to_number=data.get("to"),
+ direction=data.get("direction"),
+ duration=data.get("duration"),
+ extra=data
+ )
+
+
+@router.post("/initiate-call")
+async def initiate_call(
+ request: InitiateCallRequest, user: UserModel = Depends(get_user)
+):
+ """Initiate a call using the configured telephony provider."""
+
+ # Get the telephony provider for the organization
+ provider = await get_telephony_provider(user.selected_organization_id)
+
+ # Validate provider is configured
+ if not provider.validate_config():
+ raise HTTPException(
+ status_code=400,
+ detail="telephony_not_configured",
+ )
+
+ # Determine the workflow run mode based on provider type
+ from api.services.telephony.providers.twilio_provider import TwilioProvider
+ from api.services.telephony.providers.vonage_provider import VonageProvider
+
+ if isinstance(provider, TwilioProvider):
+ workflow_run_mode = WorkflowRunMode.TWILIO.value
+ elif isinstance(provider, VonageProvider):
+ workflow_run_mode = WorkflowRunMode.VONAGE.value
+ else:
+ # Default to TWILIO for backward compatibility
+ workflow_run_mode = WorkflowRunMode.TWILIO.value
+
+ user_configuration = await db_client.get_user_configurations(user.id)
+
+ # Use phone number from request, or fall back to user configuration
+ phone_number = request.phone_number or user_configuration.test_phone_number
+
+ if not phone_number:
+ raise HTTPException(
+ status_code=400,
+ detail="Phone number must be provided in request or set in user configuration"
+ )
+
+ workflow_run_id = request.workflow_run_id
+
+ if not workflow_run_id:
+ workflow_run_name = f"WR-TEL-{random.randint(1000, 9999)}"
+ workflow_run = await db_client.create_workflow_run(
+ workflow_run_name,
+ request.workflow_id,
+ workflow_run_mode, # Now provider-agnostic
+ initial_context={
+ "phone_number": phone_number,
+ },
+ user_id=user.id,
+ )
+ workflow_run_id = workflow_run.id
+ else:
+ workflow_run = await db_client.get_workflow_run(workflow_run_id, user.id)
+ if not workflow_run:
+ raise HTTPException(status_code=400, detail="Workflow run not found")
+ workflow_run_name = workflow_run.name
+
+ # Construct webhook URL based on provider type
+ backend_endpoint = await TunnelURLProvider.get_tunnel_url()
+
+ # Check provider type to determine webhook endpoint
+ provider_type = getattr(provider, '__class__', None).__name__ if provider else None
+ webhook_endpoint = "ncco" if provider_type == "VonageProvider" else "twiml"
+
+ webhook_url = (
+ f"https://{backend_endpoint}/api/v1/telephony/{webhook_endpoint}"
+ f"?workflow_id={request.workflow_id}"
+ f"&user_id={user.id}"
+ f"&workflow_run_id={workflow_run_id}"
+ f"&organization_id={user.selected_organization_id}"
+ )
+
+ # Initiate call via provider
+ result = await provider.initiate_call(
+ to_number=phone_number,
+ webhook_url=webhook_url,
+ workflow_run_id=workflow_run_id,
+ )
+
+ # Store call UUID for Vonage in workflow run context
+ if provider_type == "VonageProvider" and result and "uuid" in result:
+ await db_client.update_workflow_run(
+ run_id=workflow_run_id,
+ gathered_context={"call_uuid": result["uuid"]}
+ )
+
+ return {
+ "message": f"Call initiated successfully with run name {workflow_run_name}"
+ }
+
+
+@router.post("/twiml", include_in_schema=False)
+async def handle_twiml_webhook(
+ workflow_id: int,
+ user_id: int,
+ workflow_run_id: int,
+ organization_id: int
+):
+ """
+ Handle initial webhook from telephony provider.
+ Returns provider-specific response (e.g., TwiML for Twilio).
+ """
+ # Get provider for organization - exactly like original gets TwilioService
+ provider = await get_telephony_provider(organization_id)
+
+ # Generate provider-specific response (TwiML for Twilio)
+ response_content = await provider.get_webhook_response(
+ workflow_id, user_id, workflow_run_id
+ )
+
+ # Return exactly like original - HTMLResponse with application/xml
+ return HTMLResponse(content=response_content, media_type="application/xml")
+
+
+@router.get("/ncco", include_in_schema=False)
+async def handle_ncco_webhook(
+ workflow_id: int,
+ user_id: int,
+ workflow_run_id: int,
+ organization_id: Optional[int] = None
+):
+ """Handle NCCO (Nexmo Call Control Objects) webhook for Vonage.
+
+ Returns JSON response instead of XML like TwiML.
+ """
+ # Get provider for organization
+ provider = await get_telephony_provider(organization_id or user_id)
+
+ # Generate NCCO response (JSON for Vonage)
+ response_content = await provider.get_webhook_response(
+ workflow_id, user_id, workflow_run_id
+ )
+
+ # Return JSON response for Vonage
+ return json.loads(response_content)
+
+
+@router.websocket("/ws/{workflow_id}/{user_id}/{workflow_run_id}")
+async def websocket_endpoint(
+ websocket: WebSocket, workflow_id: int, user_id: int, workflow_run_id: int
+):
+ """WebSocket endpoint for real-time call handling - supports both Twilio and Vonage."""
+ await websocket.accept()
+
+ try:
+ # set the run context
+ set_current_run_id(workflow_run_id)
+
+ # Peek at the first message to determine provider
+ # Twilio sends JSON with "connected" event
+ # Vonage sends binary audio directly or may send metadata
+ first_msg = await websocket.receive()
+
+ if "text" in first_msg:
+ # Text message - likely Twilio
+ msg = json.loads(first_msg["text"])
+ if msg.get("event") == "connected":
+ # Definitely Twilio - follow Twilio flow
+
+ # "start" – this has everything we need
+ start_msg = await websocket.receive_text()
+ logger.debug(f"Received start message: {start_msg}")
+
+ start_msg = json.loads(start_msg)
+ if start_msg.get("event") != "start":
+ raise RuntimeError("Expected start message second")
+
+ try:
+ stream_sid = start_msg["start"]["streamSid"]
+ call_sid = start_msg["start"]["callSid"]
+ except KeyError:
+ logger.error(
+ "Missing callSID and streamSID in start message. Closing connection."
+ )
+ await websocket.close(code=4400, reason="Missing or bad start message")
+ return
+
+ # Run Twilio pipeline
+ await run_pipeline_twilio(
+ websocket, stream_sid, call_sid, workflow_id, workflow_run_id, user_id
+ )
+ elif msg.get("event") == "websocket:connected":
+ # This is Vonage's initial connection message
+ logger.info(f"Vonage WebSocket connected for workflow_run {workflow_run_id}")
+
+ # Get workflow run to extract call UUID
+ workflow_run = await db_client.get_workflow_run(workflow_run_id)
+ workflow = await db_client.get_workflow(workflow_id)
+
+ # Extract call UUID from workflow run context
+ call_uuid = workflow_run.gathered_context.get("call_uuid") if workflow_run.gathered_context else None
+
+ if not call_uuid:
+ logger.error("No call UUID found for Vonage connection")
+ await websocket.close(code=4400, reason="Missing call UUID")
+ return
+
+ # Run Vonage pipeline
+ await run_pipeline_vonage(
+ websocket,
+ call_uuid,
+ workflow,
+ workflow.organization_id,
+ workflow_id,
+ workflow_run_id,
+ user_id
+ )
+ else:
+ # Unknown provider or format
+ logger.warning(f"Unknown first message format: {msg}")
+
+ elif "bytes" in first_msg:
+ # Binary message - likely Vonage audio
+ # For Vonage, we need to get the call UUID from the workflow run
+ workflow_run = await db_client.get_workflow_run(workflow_run_id)
+ workflow = await db_client.get_workflow(workflow_id)
+
+ # Extract call UUID from workflow run context
+ call_uuid = workflow_run.gathered_context.get("call_uuid") if workflow_run.gathered_context else None
+
+ if not call_uuid:
+ logger.error("No call UUID found for Vonage connection")
+ await websocket.close(code=4400, reason="Missing call UUID")
+ return
+
+ # Run Vonage pipeline
+ await run_pipeline_vonage(
+ websocket,
+ call_uuid,
+ workflow,
+ workflow.organization_id, # Use the actual organization_id from workflow
+ workflow_id,
+ workflow_run_id,
+ user_id
+ )
+
+ except Exception as e:
+ logger.error(f"Error in WebSocket connection: {e}")
+ await websocket.close(1011, "Internal server error")
+
+
+@router.post("/status-callback/{workflow_run_id}")
+async def handle_status_callback(
+ workflow_run_id: int,
+ request: Request,
+ x_twilio_signature: Optional[str] = Header(None),
+):
+ """Handle status callbacks from telephony providers."""
+
+ # Parse form data
+ form_data = await request.form()
+ callback_data = dict(form_data)
+
+ logger.info(
+ f"[run {workflow_run_id}] Received status callback: {json.dumps(callback_data)}"
+ )
+
+ # Get workflow run to find organization
+ workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
+ if not workflow_run:
+ logger.warning(f"Workflow run {workflow_run_id} not found for status callback")
+ return {"status": "ignored", "reason": "workflow_run_not_found"}
+
+ # Get provider for verification (if signature provided)
+ if x_twilio_signature:
+ # Get organization from workflow run
+ workflow = await db_client.get_workflow_by_id(workflow_run.workflow_id)
+ if workflow:
+ provider = await get_telephony_provider(workflow.organization_id)
+
+ # Verify signature
+ backend_endpoint = await TunnelURLProvider.get_tunnel_url()
+ full_url = f"https://{backend_endpoint}/api/v1/telephony/status-callback/{workflow_run_id}"
+
+ is_valid = await provider.verify_webhook_signature(
+ full_url, callback_data, x_twilio_signature
+ )
+
+ if not is_valid:
+ logger.warning(f"Invalid webhook signature for workflow run {workflow_run_id}")
+ return {"status": "error", "reason": "invalid_signature"}
+
+ # Convert provider-specific callback to generic format
+ # (Currently assumes Twilio format, will be extended for other providers)
+ status_update = StatusCallbackRequest.from_twilio(callback_data)
+
+ # Process the status update
+ await _process_status_update(workflow_run_id, status_update, workflow_run)
+
+ return {"status": "success"}
+
+
+async def _process_status_update(
+ workflow_run_id: int,
+ status: StatusCallbackRequest,
+ workflow_run: any
+):
+ """Process status updates from telephony providers."""
+
+ # Log the status callback
+ twilio_callback_logs = workflow_run.logs.get("twilio_status_callbacks", [])
+ twilio_callback_log = {
+ "status": status.status,
+ "timestamp": datetime.now(UTC).isoformat(),
+ "call_id": status.call_id,
+ "duration": status.duration,
+ **status.extra # Include provider-specific data
+ }
+ twilio_callback_logs.append(twilio_callback_log)
+
+ # Update workflow run logs
+ await db_client.update_workflow_run(
+ run_id=workflow_run_id,
+ logs={"twilio_status_callbacks": twilio_callback_logs},
+ )
+
+ # Handle call completion
+ if status.status == "completed":
+ logger.info(
+ f"[run {workflow_run_id}] Call completed with duration: {status.duration}s"
+ )
+
+ # Release concurrent slot if this was a campaign call
+ if workflow_run.campaign_id:
+ await campaign_call_dispatcher.release_call_slot(workflow_run_id)
+
+ # Mark workflow run as completed
+ await db_client.update_workflow_run(
+ run_id=workflow_run_id, is_completed=True
+ )
+
+ # Publish campaign event if applicable
+ if workflow_run.campaign_id:
+ publisher = await get_campaign_event_publisher()
+ await publisher.publish_call_completed(
+ campaign_id=workflow_run.campaign_id,
+ workflow_run_id=workflow_run_id,
+ queued_run_id=workflow_run.queued_run_id,
+ call_duration=int(status.duration) if status.duration else 0,
+ )
+
+ elif status.status in ["failed", "busy", "no-answer", "canceled"]:
+ logger.warning(f"[run {workflow_run_id}] Call failed with status: {status.status}")
+
+ # Release concurrent slot for terminal statuses if this was a campaign call
+ if workflow_run.campaign_id:
+ await campaign_call_dispatcher.release_call_slot(workflow_run_id)
+
+ # Check if retry is needed for campaign calls (busy/no-answer)
+ if status.status in ["busy", "no-answer"] and workflow_run.campaign_id:
+ publisher = await get_campaign_event_publisher()
+ await publisher.publish_retry_needed(
+ workflow_run_id=workflow_run_id,
+ reason=status.status.replace("-", "_"), # Convert no-answer to no_answer
+ campaign_id=workflow_run.campaign_id,
+ queued_run_id=workflow_run.queued_run_id,
+ )
+
+ # Mark workflow run as completed with failure tags
+ call_tags = workflow_run.gathered_context.get("call_tags", []) if workflow_run.gathered_context else []
+ call_tags.extend(["not_connected", f"telephony_{status.status.lower()}"])
+
+ await db_client.update_workflow_run(
+ run_id=workflow_run_id,
+ is_completed=True,
+ gathered_context={"call_tags": call_tags}
+ )
+
+
+@router.post("/events/{workflow_run_id}")
+async def handle_vonage_events(
+ request: Request,
+ workflow_run_id: int,
+):
+ """Handle Vonage event webhooks.
+
+ Vonage sends all call events to a single endpoint.
+ Events include: started, ringing, answered, complete, failed, etc.
+ """
+ # Parse the event data
+ event_data = await request.json()
+ logger.info(f"[run {workflow_run_id}] Received Vonage event: {event_data}")
+
+ # Get workflow run for processing
+ workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
+ if not workflow_run:
+ logger.error(f"[run {workflow_run_id}] Workflow run not found")
+ return {"status": "error", "message": "Workflow run not found"}
+
+ # If this is a completed call and includes cost info, capture it immediately
+ if event_data.get("status") == "completed":
+ # Vonage sometimes includes price info in the webhook
+ if "price" in event_data or "rate" in event_data:
+ try:
+ if workflow_run.cost_info:
+ # Store immediate cost info if available
+ cost_info = workflow_run.cost_info.copy()
+ if "price" in event_data:
+ cost_info["vonage_webhook_price"] = float(event_data["price"])
+ if "rate" in event_data:
+ cost_info["vonage_webhook_rate"] = float(event_data["rate"])
+ if "duration" in event_data:
+ cost_info["vonage_webhook_duration"] = int(event_data["duration"])
+
+ await db_client.update_workflow_run(
+ run_id=workflow_run_id,
+ cost_info=cost_info
+ )
+ logger.info(f"[run {workflow_run_id}] Captured Vonage cost info from webhook")
+ except Exception as e:
+ logger.error(f"[run {workflow_run_id}] Failed to capture Vonage cost from webhook: {e}")
+
+ # Convert to generic status format
+ status_update = StatusCallbackRequest.from_vonage(event_data)
+
+ # Process the status update
+ await _process_status_update(workflow_run_id, status_update, workflow_run)
+
+ # Return 204 No Content as expected by Vonage
+ return {"status": "ok"}
\ No newline at end of file
diff --git a/api/routes/twilio.py b/api/routes/twilio.py
index 5532e0e..182370d 100644
--- a/api/routes/twilio.py
+++ b/api/routes/twilio.py
@@ -1,3 +1,6 @@
+# TODO: Remove this entire file after migrating workflow_run_cost.py to use telephony abstraction
+# All endpoints here are deprecated - use /api/v1/telephony/* instead
+
import json
import random
from datetime import UTC, datetime
@@ -17,7 +20,8 @@ from api.services.campaign.campaign_event_publisher import (
get_campaign_event_publisher,
)
from api.services.pipecat.run_pipeline import run_pipeline_twilio
-from api.services.telephony.twilio import TwilioService
+from api.services.telephony.factory import get_telephony_provider
+from api.utils.tunnel import TunnelURLProvider
from pipecat.utils.context import set_current_run_id
router = APIRouter(prefix="/twilio")
@@ -45,10 +49,10 @@ class TwilioStatusCallbackRequest(BaseModel):
async def initiate_call(
request: InitiateCallRequest, user: UserModel = Depends(get_user)
):
- # Check if organization has TWILIO_CONFIGURATION configured
+ # Check if organization has TELEPHONY_CONFIGURATION configured
twilio_config = await db_client.get_configuration(
user.selected_organization_id,
- OrganizationConfigurationKey.TWILIO_CONFIGURATION.value,
+ OrganizationConfigurationKey.TELEPHONY_CONFIGURATION.value,
)
if not twilio_config or not twilio_config.value:
@@ -80,15 +84,16 @@ async def initiate_call(
workflow_run_name = workflow_run.name
if user_configuration.test_phone_number:
- twilio_service = TwilioService(user.selected_organization_id)
- await twilio_service.initiate_call(
+ # Use new provider pattern instead of legacy TwilioService
+ provider = await get_telephony_provider(user.selected_organization_id)
+
+ # Generate webhook URL for Twilio
+ backend_endpoint = await TunnelURLProvider.get_tunnel_url()
+ webhook_url = f"https://{backend_endpoint}/api/v1/twilio/twiml?workflow_id={request.workflow_id}&user_id={user.id}&workflow_run_id={workflow_run_id}&organization_id={user.selected_organization_id}"
+
+ await provider.initiate_call(
to_number=user_configuration.test_phone_number,
- url_args={
- "workflow_id": request.workflow_id,
- "user_id": user.id,
- "workflow_run_id": workflow_run_id,
- "organization_id": user.selected_organization_id,
- },
+ webhook_url=webhook_url,
workflow_run_id=workflow_run_id,
)
return {
@@ -102,7 +107,9 @@ async def initiate_call(
async def start_call(
workflow_id: int, user_id: int, workflow_run_id: int, organization_id: int
):
- twiml_content = await TwilioService(organization_id).get_start_call_twiml(
+ # Use new provider pattern for TwiML generation
+ provider = await get_telephony_provider(organization_id)
+ twiml_content = await provider.get_webhook_response(
workflow_id, user_id, workflow_run_id
)
return HTMLResponse(content=twiml_content, media_type="application/xml")
diff --git a/api/schemas/telephony_config.py b/api/schemas/telephony_config.py
index 907a8c4..f66be66 100644
--- a/api/schemas/telephony_config.py
+++ b/api/schemas/telephony_config.py
@@ -1,7 +1,8 @@
-from typing import List
+from typing import List, Optional
from pydantic import BaseModel, Field
+# TODO: Make schemas provider-agnostic
class TwilioConfigurationRequest(BaseModel):
"""Request schema for Twilio configuration."""
@@ -23,7 +24,32 @@ class TwilioConfigurationResponse(BaseModel):
from_numbers: List[str]
+class VonageConfigurationRequest(BaseModel):
+ """Request schema for Vonage configuration."""
+
+ provider: str = Field(default="vonage")
+ api_key: Optional[str] = Field(None, description="Vonage API Key")
+ api_secret: Optional[str] = Field(None, description="Vonage API Secret")
+ application_id: str = Field(..., description="Vonage Application ID")
+ private_key: str = Field(..., description="Private key for JWT generation")
+ from_numbers: List[str] = Field(
+ ..., min_length=1, description="List of Vonage phone numbers (without + prefix)"
+ )
+
+
+class VonageConfigurationResponse(BaseModel):
+ """Response schema for Vonage configuration with masked sensitive fields."""
+
+ provider: str
+ application_id: str # Not sensitive, can show full
+ api_key: Optional[str] # Masked if present
+ api_secret: Optional[str] # Masked if present
+ private_key: str # Masked (shows only if configured)
+ from_numbers: List[str]
+
+
class TelephonyConfigurationResponse(BaseModel):
"""Top-level telephony configuration response."""
- twilio: TwilioConfigurationResponse | None = None
+ twilio: Optional[TwilioConfigurationResponse] = None
+ vonage: Optional[VonageConfigurationResponse] = None
diff --git a/api/services/campaign/call_dispatcher.py b/api/services/campaign/call_dispatcher.py
index 6488ae2..f42281f 100644
--- a/api/services/campaign/call_dispatcher.py
+++ b/api/services/campaign/call_dispatcher.py
@@ -9,7 +9,9 @@ from api.db import db_client
from api.db.models import QueuedRunModel, WorkflowRunModel
from api.enums import OrganizationConfigurationKey, WorkflowRunMode
from api.services.campaign.rate_limiter import rate_limiter
-from api.services.telephony.twilio import TwilioService
+from api.services.telephony.factory import get_telephony_provider
+from api.services.telephony.base import TelephonyProvider
+from api.utils.tunnel import TunnelURLProvider
class CampaignCallDispatcher:
@@ -18,9 +20,9 @@ class CampaignCallDispatcher:
def __init__(self):
self.default_concurrent_limit = 20
- def get_twilio_service(self, organization_id: int) -> TwilioService:
- """Get TwilioService instance for specific organization"""
- return TwilioService(organization_id)
+ async def get_telephony_provider(self, organization_id: int) -> TelephonyProvider:
+ """Get telephony provider instance for specific organization"""
+ return await get_telephony_provider(organization_id)
async def get_org_concurrent_limit(self, organization_id: int) -> int:
"""Get the concurrent call limit for an organization."""
@@ -219,19 +221,25 @@ class CampaignCallDispatcher:
},
)
- # Initiate call via Twilio
+ # Initiate call via telephony provider
try:
- twilio_service = self.get_twilio_service(campaign.organization_id)
- call_result = await twilio_service.initiate_call(
+ provider = await self.get_telephony_provider(campaign.organization_id)
+
+ # Construct webhook URL with parameters
+ backend_endpoint = await TunnelURLProvider.get_tunnel_url()
+ webhook_url = (
+ f"https://{backend_endpoint}/api/v1/telephony/twiml"
+ f"?workflow_id={campaign.workflow_id}"
+ f"&user_id={campaign.created_by}"
+ f"&workflow_run_id={workflow_run.id}"
+ f"&campaign_id={campaign.id}"
+ f"&organization_id={campaign.organization_id}"
+ )
+
+ call_result = await provider.initiate_call(
to_number=phone_number,
+ webhook_url=webhook_url,
workflow_run_id=workflow_run.id,
- url_args={
- "workflow_id": campaign.workflow_id,
- "user_id": campaign.created_by,
- "workflow_run_id": workflow_run.id,
- "campaign_id": campaign.id,
- "organization_id": campaign.organization_id,
- },
)
logger.info(
diff --git a/api/services/pipecat/audio_config.py b/api/services/pipecat/audio_config.py
index b966a62..42880ba 100644
--- a/api/services/pipecat/audio_config.py
+++ b/api/services/pipecat/audio_config.py
@@ -80,7 +80,7 @@ def create_audio_config(transport_type: str) -> AudioConfig:
"""Create audio configuration based on transport type.
Args:
- transport_type: Type of transport ("webrtc", "twilio", "stasis")
+ transport_type: Type of transport ("webrtc", "twilio", "vonage", "stasis")
Returns:
AudioConfig instance with appropriate settings
@@ -93,6 +93,15 @@ def create_audio_config(transport_type: str) -> AudioConfig:
pipeline_sample_rate=8000, # Keep at 8kHz to avoid resampling
buffer_size_seconds=1.0,
)
+ elif transport_type == WorkflowRunMode.VONAGE.value:
+ # Vonage uses 16kHz Linear PCM
+ return AudioConfig(
+ transport_in_sample_rate=16000,
+ transport_out_sample_rate=16000,
+ vad_sample_rate=16000, # Use matching VAD rate
+ pipeline_sample_rate=16000, # Keep at 16kHz to avoid resampling
+ buffer_size_seconds=1.0,
+ )
elif transport_type in [
WorkflowRunMode.WEBRTC.value,
WorkflowRunMode.SMALLWEBRTC.value,
diff --git a/api/services/pipecat/run_pipeline.py b/api/services/pipecat/run_pipeline.py
index 768817a..ce981be 100644
--- a/api/services/pipecat/run_pipeline.py
+++ b/api/services/pipecat/run_pipeline.py
@@ -4,6 +4,7 @@ from fastapi import HTTPException, WebSocket
from loguru import logger
from api.db import db_client
+from api.db.models import WorkflowModel
from api.enums import WorkflowRunMode
from api.services.pipecat.audio_config import AudioConfig, create_audio_config
from api.services.pipecat.engine_pre_aggregator_processor import (
@@ -33,6 +34,7 @@ from api.services.pipecat.tracing_config import setup_pipeline_tracing
from api.services.pipecat.transport_setup import (
create_stasis_transport,
create_twilio_transport,
+ create_vonage_transport,
create_webrtc_transport,
)
from api.services.telephony.stasis_rtp_connection import StasisRTPConnection
@@ -70,7 +72,7 @@ async def run_pipeline_twilio(
set_current_run_id(workflow_run_id)
# Store Twilio call SID in cost_info for later cost calculation
- cost_info = {"twilio_call_sid": call_sid}
+ cost_info = {"twilio_call_sid": call_sid, "provider": "twilio"}
await db_client.update_workflow_run(workflow_run_id, cost_info=cost_info)
# Get workflow to extract all pipeline configurations
@@ -107,6 +109,69 @@ async def run_pipeline_twilio(
)
+async def run_pipeline_vonage(
+ websocket_client,
+ call_uuid: str,
+ workflow: WorkflowModel,
+ organization_id: int,
+ workflow_id: int,
+ workflow_run_id: int,
+ user_id: int,
+):
+ """Run pipeline for Vonage WebSocket connections.
+
+ Vonage uses raw PCM audio over WebSocket instead of base64-encoded μ-law.
+ The audio is transmitted as binary frames at 16kHz by default.
+ """
+ logger.info(f"Starting Vonage pipeline for workflow run {workflow_run_id}")
+ set_current_run_id(workflow_run_id)
+
+ # Store Vonage call UUID in cost_info for later cost calculation
+ cost_info = {"vonage_call_uuid": call_uuid, "provider": "vonage"}
+ await db_client.update_workflow_run(workflow_run_id, cost_info=cost_info)
+
+ # Extract VAD and ambient noise config from workflow
+ vad_config = None
+ ambient_noise_config = None
+ if workflow and workflow.workflow_configurations:
+ if "vad_configuration" in workflow.workflow_configurations:
+ vad_config = workflow.workflow_configurations["vad_configuration"]
+ if "ambient_noise_configuration" in workflow.workflow_configurations:
+ ambient_noise_config = workflow.workflow_configurations["ambient_noise_configuration"]
+
+ try:
+ # Setup audio config for Vonage using the centralized config
+ audio_config = create_audio_config(WorkflowRunMode.VONAGE.value)
+
+ # Create Vonage transport
+ transport = await create_vonage_transport(
+ websocket_client,
+ call_uuid,
+ workflow_run_id,
+ audio_config,
+ organization_id,
+ vad_config,
+ ambient_noise_config,
+ )
+
+ # No special handshake needed for Vonage
+ # Audio streaming starts immediately
+
+ # Run the pipeline (same as Twilio/WebRTC)
+ await _run_pipeline(
+ transport,
+ workflow_id,
+ workflow_run_id,
+ user_id,
+ call_context_vars={},
+ audio_config=audio_config,
+ )
+
+ except Exception as e:
+ logger.error(f"Error in Vonage pipeline: {e}")
+ raise
+
+
async def run_pipeline_smallwebrtc(
webrtc_connection: SmallWebRTCConnection,
workflow_id: int,
diff --git a/api/services/pipecat/transport_setup.py b/api/services/pipecat/transport_setup.py
index 7c96c51..480cf67 100644
--- a/api/services/pipecat/transport_setup.py
+++ b/api/services/pipecat/transport_setup.py
@@ -22,6 +22,7 @@ from pipecat.audio.mixers.soundfile_mixer import SoundfileMixer
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.vad.silero import SileroVADAnalyzer, VADParams
from pipecat.serializers.twilio import TwilioFrameSerializer
+from pipecat.serializers.vonage import VonageFrameSerializer
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection
from pipecat.transports.smallwebrtc.transport import SmallWebRTCTransport
@@ -85,7 +86,7 @@ async def create_twilio_transport(
# Fetch Twilio credentials from organization config
config = await db_client.get_configuration(
- organization_id, OrganizationConfigurationKey.TWILIO_CONFIGURATION.value
+ organization_id, OrganizationConfigurationKey.TELEPHONY_CONFIGURATION.value
)
if not config or not config.value:
@@ -151,6 +152,86 @@ async def create_twilio_transport(
)
+async def create_vonage_transport(
+ websocket_client,
+ call_uuid: str,
+ workflow_run_id: int,
+ audio_config: AudioConfig,
+ organization_id: int,
+ vad_config: dict | None = None,
+ ambient_noise_config: dict | None = None,
+):
+ """Create a transport for Vonage connections"""
+
+ # Use the factory to load config from database
+ from api.services.telephony.factory import load_telephony_config
+ config = await load_telephony_config(organization_id)
+
+ if config.get("provider") != "vonage":
+ raise ValueError(f"Expected Vonage provider, got {config.get('provider')}")
+
+ application_id = config.get("application_id")
+ private_key = config.get("private_key")
+
+ if not application_id or not private_key:
+ raise ValueError(
+ f"Incomplete Vonage configuration for organization {organization_id}"
+ )
+
+ turn_analyzer = create_turn_analyzer(workflow_run_id, audio_config)
+
+ serializer = VonageFrameSerializer(
+ call_uuid=call_uuid,
+ application_id=application_id,
+ private_key=private_key,
+ params=VonageFrameSerializer.InputParams(
+ vonage_sample_rate=audio_config.transport_in_sample_rate,
+ sample_rate=audio_config.pipeline_sample_rate
+ )
+ )
+
+ # Important: Vonage uses binary WebSocket mode, not text
+ return FastAPIWebsocketTransport(
+ websocket=websocket_client,
+ params=FastAPIWebsocketParams(
+ audio_in_enabled=True,
+ audio_out_enabled=True,
+ audio_in_sample_rate=audio_config.transport_in_sample_rate,
+ audio_out_sample_rate=audio_config.transport_out_sample_rate,
+ vad_analyzer=(
+ SileroVADAnalyzer(
+ params=VADParams(
+ confidence=vad_config.get("confidence", 0.7),
+ start_secs=vad_config.get("start_seconds", 0.4),
+ stop_secs=vad_config.get("stop_seconds", 0.8),
+ min_volume=vad_config.get("minimum_volume", 0.6),
+ )
+ )
+ if vad_config
+ else SileroVADAnalyzer()
+ ),
+ audio_out_mixer=(
+ SoundfileMixer(
+ sound_files={
+ "office": APP_ROOT_DIR
+ / "assets"
+ / f"office-ambience-{audio_config.transport_out_sample_rate}-mono.wav"
+ },
+ default_sound="office",
+ volume=ambient_noise_config.get("volume", 0.3),
+ )
+ if ambient_noise_config and ambient_noise_config.get("enabled", False)
+ else SilenceAudioMixer()
+ ),
+ turn_analyzer=turn_analyzer,
+ serializer=serializer,
+ audio_in_filter=RNNoiseFilter(library_path=librnnoise_path)
+ if ENABLE_RNNOISE
+ else None,
+ ),
+ )
+
+
def create_webrtc_transport(
webrtc_connection: SmallWebRTCConnection,
workflow_run_id: int,
diff --git a/api/services/telephony/README.md b/api/services/telephony/README.md
new file mode 100644
index 0000000..812b864
--- /dev/null
+++ b/api/services/telephony/README.md
@@ -0,0 +1,167 @@
+# Telephony Provider Implementation
+
+This module implements the telephony provider abstraction for Dograh AI. For user-facing documentation, see the [Mintlify docs](https://docs.dograh.com/integrations/telephony/overview).
+
+## Architecture
+
+```
+Business Logic → TelephonyProvider (Interface) → Concrete Provider (Twilio, Vonage, etc.)
+```
+
+## Developer Quick Reference
+
+### Using the Provider in Code
+
+```python
+from api.services.telephony.factory import get_telephony_provider
+
+# Get provider based on organization config
+provider = await get_telephony_provider(organization_id)
+
+# Initiate a call
+result = await provider.initiate_call(
+ to_number="+1987654321",
+ webhook_url="https://your-app.com/webhook",
+ workflow_run_id=123
+)
+```
+
+## File Structure
+
+```
+telephony/
+├── __init__.py
+├── base.py # Abstract TelephonyProvider interface
+├── factory.py # Provider creation and config loading
+├── providers/
+│ ├── __init__.py
+│ ├── twilio_provider.py # Twilio implementation
+│ └── vonage_provider.py # Vonage implementation
+├── twilio.py # Legacy (removed, use factory instead)
+└── README.md # This file
+```
+
+## Implementing a New Provider
+
+See the [Custom Provider Guide](https://docs.dograh.com/integrations/telephony/custom) in the documentation for detailed implementation instructions.
+
+Quick checklist:
+1. Create `providers/your_provider.py` implementing `TelephonyProvider`
+2. Update `factory.py` to include your provider
+3. Write unit tests
+4. Update documentation
+
+## Key Interfaces
+
+```python
+class TelephonyProvider(ABC):
+ @abstractmethod
+ async def initiate_call(self, to_number: str, webhook_url: str, workflow_run_id: Optional[int] = None, **kwargs: Any) -> Dict[str, Any]
+
+ @abstractmethod
+ async def get_call_status(self, call_id: str) -> Dict[str, Any]
+
+ @abstractmethod
+ async def get_available_phone_numbers(self) -> List[str]
+
+ @abstractmethod
+ def validate_config(self) -> bool
+
+ @abstractmethod
+ async def verify_webhook_signature(self, url: str, params: Dict[str, Any], signature: str) -> bool
+
+ @abstractmethod
+ async def get_webhook_response(self, workflow_id: int, user_id: int, workflow_run_id: int) -> str
+```
+
+## Configuration Loading
+
+The `factory.py` loads configuration from the database:
+
+**Both Saas and OSS Modes**: Database configuration via UI
+ ```python
+ # Loaded from organization_configuration table
+ key: "TELEPHONY_CONFIGURATION"
+ value: {
+ "provider": "twilio", # or "vonage"
+ "account_sid": "xxx", # for Twilio
+ "auth_token": "xxx", # for Twilio
+ "application_id": "xxx", # for Vonage
+ "private_key": "xxx", # for Vonage
+ "from_numbers": [...]
+ }
+ ```
+
+## Testing
+
+### Unit Testing with Mock Provider
+
+```python
+class MockProvider(TelephonyProvider):
+ async def initiate_call(self, to_number, webhook_url, **kwargs):
+ return {"call_id": "mock_123", "status": "initiated"}
+
+ async def get_call_status(self, call_id):
+ return {"call_id": call_id, "status": "completed"}
+
+ # Implement other required methods...
+
+# In tests
+@patch('api.services.telephony.factory.get_telephony_provider')
+async def test_call_initiation(mock_get_provider):
+ mock_get_provider.return_value = MockProvider()
+ # Test your business logic
+```
+
+### Integration Testing
+
+Run against actual providers in development:
+
+1. Configure your provider through the UI:
+ - Navigate to Settings → Integrations → Telephony
+ - Select your provider (Twilio or Vonage)
+ - Enter test credentials
+ - Save configuration
+
+2. Run integration tests:
+```bash
+pytest tests/integration/test_telephony.py
+```
+
+## Migration Notes
+
+### From Direct TwilioService Usage
+
+Old code:
+```python
+from api.services.telephony.twilio import TwilioService
+service = TwilioService(org_id)
+await service.initiate_call(...)
+```
+
+New code:
+```python
+from api.services.telephony.factory import get_telephony_provider
+provider = await get_telephony_provider(org_id)
+await provider.initiate_call(...)
+```
+
+### Backward Compatibility
+
+- Old `/api/v1/twilio/*` endpoints still work (redirect to `/api/v1/telephony/*`)
+- `TwilioService` class remains for legacy code
+- Database configuration key `TWILIO_CONFIGURATION` unchanged
+
+## Common Issues
+
+1. **Import Error**: Always import from `factory`, not directly from providers
+2. **Config Not Found**: Check database configuration via UI
+3. **Signature Verification**: Ensure auth tokens match between provider and config
+4. **WebSocket Issues**: Verify audio format compatibility (MULAW for Twilio)
+
+## Related Documentation
+
+- [User Documentation](https://docs.dograh.com/integrations/telephony/overview)
+- [Twilio Integration](https://docs.dograh.com/integrations/telephony/twilio)
+- [Custom Providers](https://docs.dograh.com/integrations/telephony/custom)
+- [Webhooks Guide](https://docs.dograh.com/integrations/telephony/webhooks)
\ No newline at end of file
diff --git a/api/services/telephony/base.py b/api/services/telephony/base.py
new file mode 100644
index 0000000..28b55e3
--- /dev/null
+++ b/api/services/telephony/base.py
@@ -0,0 +1,120 @@
+"""
+Base telephony provider interface for abstracting telephony services.
+This allows easy switching between different providers (Twilio, Vonage, etc.)
+while keeping business logic decoupled from specific implementations.
+"""
+from abc import ABC, abstractmethod
+from typing import Any, Dict, List, Optional
+
+
+class TelephonyProvider(ABC):
+ """
+ Abstract base class for telephony providers.
+ All telephony providers must implement these core methods.
+ """
+
+ @abstractmethod
+ async def initiate_call(
+ self,
+ to_number: str,
+ webhook_url: str,
+ workflow_run_id: Optional[int] = None,
+ **kwargs: Any,
+ ) -> Dict[str, Any]:
+ """
+ Initiate an outbound call.
+
+ Args:
+ to_number: The destination phone number
+ webhook_url: The URL to receive call events
+ workflow_run_id: Optional workflow run ID for tracking
+ **kwargs: Provider-specific additional parameters
+
+ Returns:
+ Dict containing call details (provider-specific format)
+ """
+ pass
+
+ @abstractmethod
+ async def get_call_status(self, call_id: str) -> Dict[str, Any]:
+ """
+ Get the current status of a call.
+
+ Args:
+ call_id: The provider-specific call identifier
+
+ Returns:
+ Dict containing call status information
+ """
+ pass
+
+ @abstractmethod
+ async def get_available_phone_numbers(self) -> List[str]:
+ """
+ Get list of available phone numbers for this provider.
+
+ Returns:
+ List of phone numbers that can be used for outbound calls
+ """
+ pass
+
+ @abstractmethod
+ def validate_config(self) -> bool:
+ """
+ Validate that the provider is properly configured.
+
+ Returns:
+ True if configuration is valid, False otherwise
+ """
+ pass
+
+ @abstractmethod
+ async def verify_webhook_signature(
+ self, url: str, params: Dict[str, Any], signature: str
+ ) -> bool:
+ """
+ Verify webhook signature for security.
+
+ Args:
+ url: The webhook URL
+ params: The webhook parameters
+ signature: The signature to verify
+
+ Returns:
+ True if signature is valid, False otherwise
+ """
+ pass
+
+ @abstractmethod
+ async def get_webhook_response(
+ self, workflow_id: int, user_id: int, workflow_run_id: int
+ ) -> str:
+ """
+ Generate the initial webhook response for starting a call session.
+
+ Args:
+ workflow_id: The workflow ID
+ user_id: The user ID
+ workflow_run_id: The workflow run ID
+
+ Returns:
+ Provider-specific response (e.g., TwiML for Twilio)
+ """
+ pass
+
+ @abstractmethod
+ async def get_call_cost(self, call_id: str) -> Dict[str, Any]:
+ """
+ Get cost information for a completed call.
+
+ Args:
+ call_id: Provider-specific call identifier (SID for Twilio, UUID for Vonage)
+
+ Returns:
+ Dict containing:
+ - cost_usd: The cost in USD as float
+ - duration: Call duration in seconds
+ - status: Call completion status
+ - raw_response: Full provider response for debugging
+ """
+ pass
\ No newline at end of file
diff --git a/api/services/telephony/factory.py b/api/services/telephony/factory.py
new file mode 100644
index 0000000..983a475
--- /dev/null
+++ b/api/services/telephony/factory.py
@@ -0,0 +1,109 @@
+"""
+Factory for creating telephony providers.
+Handles configuration loading from environment (OSS) or database (SaaS).
+The providers themselves don't know or care where config comes from.
+"""
+import os
+from typing import Any, Dict, Optional
+
+from loguru import logger
+
+from api.db import db_client
+from api.enums import OrganizationConfigurationKey
+from api.services.telephony.base import TelephonyProvider
+from api.services.telephony.providers.twilio_provider import TwilioProvider
+from api.services.telephony.providers.vonage_provider import VonageProvider
+
+
+async def load_telephony_config(organization_id: int) -> Dict[str, Any]:
+ """
+ Load telephony configuration from database.
+
+ Args:
+ organization_id: Organization ID for database config
+
+ Returns:
+ Configuration dictionary with provider type and credentials
+
+ Raises:
+ ValueError: If no configuration found for the organization
+ """
+ if not organization_id:
+ raise ValueError("Organization ID is required to load telephony configuration")
+
+ logger.debug(f"Loading telephony config from database for org {organization_id}")
+
+ # Try new key first
+ config = await db_client.get_configuration(
+ organization_id,
+ OrganizationConfigurationKey.TELEPHONY_CONFIGURATION.value,
+ )
+
+ # Fallback to old key for backward compatibility
+ if not config:
+ config = await db_client.get_configuration(
+ organization_id,
+ OrganizationConfigurationKey.TWILIO_CONFIGURATION.value,
+ )
+
+ if config and config.value:
+ # Simple single-provider format
+ provider = config.value.get("provider", "twilio")
+
+ if provider == "twilio":
+ return {
+ "provider": "twilio",
+ "account_sid": config.value.get("account_sid"),
+ "auth_token": config.value.get("auth_token"),
+ "from_numbers": config.value.get("from_numbers", [])
+ }
+ elif provider == "vonage":
+ return {
+ "provider": "vonage",
+ "application_id": config.value.get("application_id"),
+ "private_key": config.value.get("private_key"),
+ "api_key": config.value.get("api_key"),
+ "api_secret": config.value.get("api_secret"),
+ "from_numbers": config.value.get("from_numbers", [])
+ }
+ else:
+ raise ValueError(f"Unknown provider in config: {provider}")
+
+ raise ValueError(f"No telephony configuration found for organization {organization_id}")
+
+
+async def get_telephony_provider(
+ organization_id: int
+) -> TelephonyProvider:
+ """
+ Factory function to create telephony providers.
+
+ Args:
+ organization_id: Organization ID (required)
+
+ Returns:
+ Configured telephony provider instance
+
+ Raises:
+ ValueError: If provider type is unknown or configuration is invalid
+ """
+ # Load configuration from appropriate source
+ config = await load_telephony_config(organization_id)
+
+ provider_type = config.get("provider", "twilio")
+ logger.info(f"Creating {provider_type} telephony provider")
+
+ # Create provider instance with configuration
+ # Provider doesn't know or care if config came from env or database
+ if provider_type == "twilio":
+ return TwilioProvider(config)
+
+ elif provider_type == "vonage":
+ return VonageProvider(config)
+
+ # Future providers can be added here
+ # elif provider_type == "plivo":
+ # return PlivoProvider(config)
+
+ else:
+ raise ValueError(f"Unknown telephony provider: {provider_type}")
\ No newline at end of file
diff --git a/api/services/telephony/providers/__init__.py b/api/services/telephony/providers/__init__.py
new file mode 100644
index 0000000..5c8985e
--- /dev/null
+++ b/api/services/telephony/providers/__init__.py
@@ -0,0 +1 @@
+# Telephony provider implementations
\ No newline at end of file
diff --git a/api/services/telephony/providers/twilio_provider.py b/api/services/telephony/providers/twilio_provider.py
new file mode 100644
index 0000000..261e0d6
--- /dev/null
+++ b/api/services/telephony/providers/twilio_provider.py
@@ -0,0 +1,204 @@
+"""
+Twilio implementation of the TelephonyProvider interface.
+"""
+import random
+from typing import Any, Dict, List, Optional
+
+import aiohttp
+from loguru import logger
+from twilio.request_validator import RequestValidator
+
+from api.services.telephony.base import TelephonyProvider
+from api.utils.tunnel import TunnelURLProvider
+
+
+class TwilioProvider(TelephonyProvider):
+ """
+ Twilio implementation of TelephonyProvider.
+ Accepts configuration and works the same regardless of OSS/SaaS mode.
+ """
+
+ def __init__(self, config: Dict[str, Any]):
+ """
+ Initialize TwilioProvider with configuration.
+
+ Args:
+ config: Dictionary containing:
+ - account_sid: Twilio Account SID
+ - auth_token: Twilio Auth Token
+ - from_numbers: List of phone numbers to use
+ """
+ self.account_sid = config.get("account_sid")
+ self.auth_token = config.get("auth_token")
+ self.from_numbers = config.get("from_numbers", [])
+
+ # Handle both single number (string) and multiple numbers (list)
+ if isinstance(self.from_numbers, str):
+ self.from_numbers = [self.from_numbers]
+
+ self.base_url = f"https://api.twilio.com/2010-04-01/Accounts/{self.account_sid}"
+
+ async def initiate_call(
+ self,
+ to_number: str,
+ webhook_url: str,
+ workflow_run_id: Optional[int] = None,
+ **kwargs: Any,
+ ) -> Dict[str, Any]:
+ """
+ Initiate an outbound call via Twilio.
+ """
+ if not self.validate_config():
+ raise ValueError("Twilio provider not properly configured")
+
+ endpoint = f"{self.base_url}/Calls.json"
+
+ # Select a random phone number
+ from_number = random.choice(self.from_numbers)
+ logger.info(f"Selected phone number {from_number} for outbound call")
+
+ # Prepare call data
+ data = {
+ "To": to_number,
+ "From": from_number,
+ "Url": webhook_url
+ }
+
+ # Add status callback if workflow_run_id provided
+ if workflow_run_id:
+ backend_endpoint = await TunnelURLProvider.get_tunnel_url()
+ callback_url = f"https://{backend_endpoint}/api/v1/telephony/status-callback/{workflow_run_id}"
+ data.update({
+ "StatusCallback": callback_url,
+ "StatusCallbackEvent": ["initiated", "ringing", "answered", "completed"],
+ "StatusCallbackMethod": "POST"
+ })
+
+ # Add any additional kwargs
+ data.update(kwargs)
+
+ # Make the API request
+ async with aiohttp.ClientSession() as session:
+ auth = aiohttp.BasicAuth(self.account_sid, self.auth_token)
+ async with session.post(endpoint, data=data, auth=auth) as response:
+ if response.status != 201:
+ error_data = await response.json()
+ raise Exception(f"Failed to initiate call: {error_data}")
+
+ return await response.json()
+
+ async def get_call_status(self, call_id: str) -> Dict[str, Any]:
+ """
+ Get the current status of a Twilio call.
+ """
+ if not self.validate_config():
+ raise ValueError("Twilio provider not properly configured")
+
+ endpoint = f"{self.base_url}/Calls/{call_id}.json"
+
+ async with aiohttp.ClientSession() as session:
+ auth = aiohttp.BasicAuth(self.account_sid, self.auth_token)
+ async with session.get(endpoint, auth=auth) as response:
+ if response.status != 200:
+ error_data = await response.json()
+ raise Exception(f"Failed to get call status: {error_data}")
+
+ return await response.json()
+
+ async def get_available_phone_numbers(self) -> List[str]:
+ """
+ Get list of available Twilio phone numbers.
+ """
+ return self.from_numbers
+
+ def validate_config(self) -> bool:
+ """
+ Validate Twilio configuration.
+ """
+ return bool(
+ self.account_sid and
+ self.auth_token and
+ self.from_numbers
+ )
+
+ async def verify_webhook_signature(
+ self, url: str, params: Dict[str, Any], signature: str
+ ) -> bool:
+ """
+ Verify Twilio webhook signature for security.
+ """
+ if not self.auth_token:
+ logger.error("No auth token available for webhook signature verification")
+ return False
+
+ validator = RequestValidator(self.auth_token)
+ return validator.validate(url, params, signature)
+
+ async def get_webhook_response(
+ self, workflow_id: int, user_id: int, workflow_run_id: int
+ ) -> str:
+ """
+ Generate TwiML response for starting a call session.
+ """
+ backend_endpoint = await TunnelURLProvider.get_tunnel_url()
+
+ twiml_content = f"""
+
+ ⚠️ Switching providers will require entering new credentials +
+ )} {/* Twilio-specific fields */} @@ -249,6 +288,87 @@ export default function ConfigureTelephonyPage() { > )} + {/* Vonage-specific fields */} + {selectedProvider === "vonage" && ( + <> ++ {errors.application_id.message} +
+ )} ++ {errors.private_key.message} +
+ )} ++ {errors.from_number.message} +
+ )} +