mirror of
https://github.com/dograh-hq/dograh.git
synced 2026-06-07 07:55:16 +02:00
* feat: tansfer calls with aasterisk * chore: format code with pre-commit script * chore: refactor code * refactor: add call strategies, cleanup transfer events * fix: docker compose, add missing files from merge conflicts * chore: update pipecat * docs: restructure & add mintilify pages for tool * chore: upgrade pipecat
470 lines
16 KiB
Python
470 lines
16 KiB
Python
import os
|
|
|
|
from fastapi import WebSocket
|
|
|
|
from api.constants import APP_ROOT_DIR
|
|
from api.db import db_client
|
|
from api.enums import OrganizationConfigurationKey
|
|
from api.services.pipecat.audio_config import AudioConfig
|
|
from api.services.telephony.providers.ari_call_strategies import (
|
|
ARIBridgeSwapStrategy,
|
|
ARIHangupStrategy,
|
|
)
|
|
from api.services.telephony.providers.twilio_call_strategies import (
|
|
TwilioConferenceStrategy,
|
|
TwilioHangupStrategy,
|
|
)
|
|
from pipecat.audio.mixers.silence_mixer import SilenceAudioMixer
|
|
from pipecat.audio.mixers.soundfile_mixer import SoundfileMixer
|
|
from pipecat.serializers.asterisk import AsteriskFrameSerializer
|
|
from pipecat.serializers.twilio import TwilioFrameSerializer
|
|
from pipecat.serializers.vobiz import VobizFrameSerializer
|
|
from pipecat.serializers.vonage import VonageFrameSerializer
|
|
from pipecat.transports.base_transport import TransportParams
|
|
from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection
|
|
from pipecat.transports.smallwebrtc.transport import SmallWebRTCTransport
|
|
from pipecat.transports.websocket.fastapi import (
|
|
FastAPIWebsocketParams,
|
|
FastAPIWebsocketTransport,
|
|
)
|
|
|
|
librnnoise_path = os.path.normpath(
|
|
str(APP_ROOT_DIR / "native" / "rnnoise" / "librnnoise.so")
|
|
)
|
|
|
|
|
|
async def create_twilio_transport(
|
|
websocket_client: WebSocket,
|
|
stream_sid: str,
|
|
call_sid: str,
|
|
workflow_run_id: int,
|
|
audio_config: AudioConfig,
|
|
organization_id: int,
|
|
vad_config: dict | None = None,
|
|
ambient_noise_config: dict | None = None,
|
|
):
|
|
"""Create a transport for Twilio connections"""
|
|
|
|
# Fetch Twilio credentials from organization config
|
|
config = await db_client.get_configuration(
|
|
organization_id, OrganizationConfigurationKey.TELEPHONY_CONFIGURATION.value
|
|
)
|
|
|
|
if not config or not config.value:
|
|
raise ValueError(
|
|
f"Twilio credentials not configured for organization {organization_id}"
|
|
)
|
|
|
|
account_sid = config.value.get("account_sid")
|
|
auth_token = config.value.get("auth_token")
|
|
|
|
if not account_sid or not auth_token:
|
|
raise ValueError(
|
|
f"Incomplete Twilio configuration for organization {organization_id}"
|
|
)
|
|
# Create strategy instances
|
|
transfer_strategy = TwilioConferenceStrategy()
|
|
hangup_strategy = TwilioHangupStrategy()
|
|
|
|
serializer = TwilioFrameSerializer(
|
|
stream_sid=stream_sid,
|
|
call_sid=call_sid,
|
|
account_sid=account_sid,
|
|
auth_token=auth_token,
|
|
transfer_strategy=transfer_strategy,
|
|
hangup_strategy=hangup_strategy,
|
|
)
|
|
|
|
return FastAPIWebsocketTransport(
|
|
websocket=websocket_client,
|
|
params=FastAPIWebsocketParams(
|
|
audio_in_enabled=True,
|
|
audio_out_enabled=True,
|
|
audio_in_sample_rate=audio_config.transport_in_sample_rate,
|
|
audio_out_sample_rate=audio_config.transport_out_sample_rate,
|
|
audio_out_mixer=(
|
|
SoundfileMixer(
|
|
sound_files={
|
|
"office": APP_ROOT_DIR
|
|
/ "assets"
|
|
/ f"office-ambience-{audio_config.transport_out_sample_rate}-mono.wav"
|
|
},
|
|
default_sound="office",
|
|
volume=ambient_noise_config.get("volume", 0.3),
|
|
)
|
|
if ambient_noise_config and ambient_noise_config.get("enabled", False)
|
|
else SilenceAudioMixer()
|
|
),
|
|
serializer=serializer,
|
|
),
|
|
)
|
|
|
|
|
|
async def create_cloudonix_transport(
|
|
websocket_client: WebSocket,
|
|
call_id: str,
|
|
stream_sid: str,
|
|
workflow_run_id: int,
|
|
audio_config: AudioConfig,
|
|
organization_id: int,
|
|
vad_config: dict | None = None,
|
|
ambient_noise_config: dict | None = None,
|
|
):
|
|
"""Create a transport for Cloudonix connections"""
|
|
|
|
# Load Cloudonix configuration from database
|
|
from api.services.telephony.factory import load_telephony_config
|
|
|
|
config = await load_telephony_config(organization_id)
|
|
|
|
if config.get("provider") != "cloudonix":
|
|
raise ValueError(f"Expected Cloudonix provider, got {config.get('provider')}")
|
|
|
|
bearer_token = config.get("bearer_token")
|
|
domain_id = config.get("domain_id")
|
|
|
|
if not bearer_token or not domain_id:
|
|
raise ValueError(
|
|
f"Incomplete Cloudonix configuration for organization {organization_id}. "
|
|
f"Required: bearer_token, domain_id"
|
|
)
|
|
|
|
from pipecat.serializers.cloudonix import CloudonixFrameSerializer
|
|
|
|
serializer = CloudonixFrameSerializer(
|
|
call_id=call_id,
|
|
stream_sid=stream_sid,
|
|
domain_id=domain_id,
|
|
bearer_token=bearer_token,
|
|
)
|
|
|
|
return FastAPIWebsocketTransport(
|
|
websocket=websocket_client,
|
|
params=FastAPIWebsocketParams(
|
|
audio_in_enabled=True,
|
|
audio_out_enabled=True,
|
|
audio_in_sample_rate=audio_config.transport_in_sample_rate,
|
|
audio_out_sample_rate=audio_config.transport_out_sample_rate,
|
|
audio_out_mixer=(
|
|
SoundfileMixer(
|
|
sound_files={
|
|
"office": APP_ROOT_DIR
|
|
/ "assets"
|
|
/ f"office-ambience-{audio_config.transport_out_sample_rate}-mono.wav"
|
|
},
|
|
default_sound="office",
|
|
volume=ambient_noise_config.get("volume", 0.3),
|
|
)
|
|
if ambient_noise_config and ambient_noise_config.get("enabled", False)
|
|
else SilenceAudioMixer()
|
|
),
|
|
serializer=serializer,
|
|
audio_out_10ms_chunks=2,
|
|
),
|
|
)
|
|
|
|
|
|
async def create_ari_transport(
|
|
websocket_client: WebSocket,
|
|
channel_id: str,
|
|
workflow_run_id: int,
|
|
audio_config: AudioConfig,
|
|
organization_id: int,
|
|
vad_config: dict | None = None,
|
|
ambient_noise_config: dict | None = None,
|
|
):
|
|
"""Create a transport for Asterisk ARI connections"""
|
|
|
|
from api.services.telephony.factory import load_telephony_config
|
|
|
|
config = await load_telephony_config(organization_id)
|
|
|
|
if config.get("provider") != "ari":
|
|
raise ValueError(f"Expected ARI provider, got {config.get('provider')}")
|
|
|
|
ari_endpoint = config.get("ari_endpoint")
|
|
app_name = config.get("app_name")
|
|
app_password = config.get("app_password")
|
|
|
|
if not ari_endpoint or not app_name or not app_password:
|
|
raise ValueError(
|
|
f"Incomplete ARI configuration for organization {organization_id}. "
|
|
f"Required: ari_endpoint, app_name, app_password"
|
|
)
|
|
# Create strategy instances
|
|
transfer_strategy = ARIBridgeSwapStrategy()
|
|
hangup_strategy = ARIHangupStrategy()
|
|
|
|
serializer = AsteriskFrameSerializer(
|
|
channel_id=channel_id,
|
|
ari_endpoint=ari_endpoint,
|
|
app_name=app_name,
|
|
app_password=app_password,
|
|
transfer_strategy=transfer_strategy,
|
|
hangup_strategy=hangup_strategy,
|
|
params=AsteriskFrameSerializer.InputParams(
|
|
asterisk_sample_rate=audio_config.transport_in_sample_rate,
|
|
sample_rate=audio_config.pipeline_sample_rate,
|
|
),
|
|
)
|
|
|
|
return FastAPIWebsocketTransport(
|
|
websocket=websocket_client,
|
|
params=FastAPIWebsocketParams(
|
|
audio_in_enabled=True,
|
|
audio_out_enabled=True,
|
|
audio_in_sample_rate=audio_config.transport_in_sample_rate,
|
|
audio_out_sample_rate=audio_config.transport_out_sample_rate,
|
|
audio_out_mixer=(
|
|
SoundfileMixer(
|
|
sound_files={
|
|
"office": APP_ROOT_DIR
|
|
/ "assets"
|
|
/ f"office-ambience-{audio_config.transport_out_sample_rate}-mono.wav"
|
|
},
|
|
default_sound="office",
|
|
volume=ambient_noise_config.get("volume", 0.3),
|
|
)
|
|
if ambient_noise_config and ambient_noise_config.get("enabled", False)
|
|
else SilenceAudioMixer()
|
|
),
|
|
serializer=serializer,
|
|
),
|
|
)
|
|
|
|
|
|
async def create_vonage_transport(
|
|
websocket_client,
|
|
call_uuid: str,
|
|
workflow_run_id: int,
|
|
audio_config: AudioConfig,
|
|
organization_id: int,
|
|
vad_config: dict | None = None,
|
|
ambient_noise_config: dict | None = None,
|
|
):
|
|
"""Create a transport for Vonage connections"""
|
|
|
|
# Use the factory to load config from database
|
|
from api.services.telephony.factory import load_telephony_config
|
|
|
|
config = await load_telephony_config(organization_id)
|
|
|
|
if config.get("provider") != "vonage":
|
|
raise ValueError(f"Expected Vonage provider, got {config.get('provider')}")
|
|
|
|
application_id = config.get("application_id")
|
|
private_key = config.get("private_key")
|
|
|
|
if not application_id or not private_key:
|
|
raise ValueError(
|
|
f"Incomplete Vonage configuration for organization {organization_id}"
|
|
)
|
|
|
|
serializer = VonageFrameSerializer(
|
|
call_uuid=call_uuid,
|
|
application_id=application_id,
|
|
private_key=private_key,
|
|
params=VonageFrameSerializer.InputParams(
|
|
vonage_sample_rate=audio_config.transport_in_sample_rate,
|
|
sample_rate=audio_config.pipeline_sample_rate,
|
|
),
|
|
)
|
|
|
|
# Important: Vonage uses binary WebSocket mode, not text
|
|
return FastAPIWebsocketTransport(
|
|
websocket=websocket_client,
|
|
params=FastAPIWebsocketParams(
|
|
audio_in_enabled=True,
|
|
audio_out_enabled=True,
|
|
audio_in_sample_rate=audio_config.transport_in_sample_rate,
|
|
audio_out_sample_rate=audio_config.transport_out_sample_rate,
|
|
audio_out_mixer=(
|
|
SoundfileMixer(
|
|
sound_files={
|
|
"office": APP_ROOT_DIR
|
|
/ "assets"
|
|
/ f"office-ambience-{audio_config.transport_out_sample_rate}-mono.wav"
|
|
},
|
|
default_sound="office",
|
|
volume=ambient_noise_config.get("volume", 0.3),
|
|
)
|
|
if ambient_noise_config and ambient_noise_config.get("enabled", False)
|
|
else SilenceAudioMixer()
|
|
),
|
|
serializer=serializer,
|
|
),
|
|
)
|
|
|
|
|
|
async def create_vobiz_transport(
|
|
websocket_client: WebSocket,
|
|
stream_id: str,
|
|
call_id: str,
|
|
workflow_run_id: int,
|
|
audio_config: AudioConfig,
|
|
organization_id: int,
|
|
vad_config: dict | None = None,
|
|
ambient_noise_config: dict | None = None,
|
|
):
|
|
"""Create a transport for Vobiz connections.
|
|
|
|
Vobiz uses Plivo-compatible WebSocket protocol:
|
|
- MULAW audio at 8kHz (same as Twilio)
|
|
- Base64-encoded audio in JSON messages
|
|
- PlivoFrameSerializer handles the protocol
|
|
"""
|
|
from loguru import logger
|
|
|
|
logger.info(
|
|
f"[run {workflow_run_id}] Creating Vobiz transport - "
|
|
f"stream_id={stream_id}, call_id={call_id}"
|
|
)
|
|
|
|
# Load Vobiz configuration from database
|
|
from api.services.telephony.factory import load_telephony_config
|
|
|
|
config = await load_telephony_config(organization_id)
|
|
|
|
if config.get("provider") != "vobiz":
|
|
raise ValueError(f"Expected Vobiz provider, got {config.get('provider')}")
|
|
|
|
auth_id = config.get("auth_id")
|
|
auth_token = config.get("auth_token")
|
|
|
|
if not auth_id or not auth_token:
|
|
raise ValueError(
|
|
f"Incomplete Vobiz configuration for organization {organization_id}"
|
|
)
|
|
|
|
logger.debug(
|
|
f"[run {workflow_run_id}] Vobiz config loaded - auth_id={auth_id}, "
|
|
f"from_numbers={len(config.get('from_numbers', []))} numbers"
|
|
)
|
|
|
|
# Use VobizFrameSerializer for Vobiz WebSocket protocol
|
|
serializer = VobizFrameSerializer(
|
|
stream_id=stream_id,
|
|
call_id=call_id,
|
|
auth_id=auth_id,
|
|
auth_token=auth_token,
|
|
params=VobizFrameSerializer.InputParams(
|
|
vobiz_sample_rate=8000, # Vobiz uses MULAW at 8kHz
|
|
sample_rate=audio_config.pipeline_sample_rate,
|
|
),
|
|
)
|
|
|
|
logger.debug(
|
|
f"[run {workflow_run_id}] VobizFrameSerializer created for Vobiz - "
|
|
f"transport_rate=8000Hz, pipeline_rate={audio_config.pipeline_sample_rate}Hz"
|
|
)
|
|
|
|
# Create WebSocket transport (same structure as Twilio/Vonage)
|
|
transport = FastAPIWebsocketTransport(
|
|
websocket=websocket_client,
|
|
params=FastAPIWebsocketParams(
|
|
audio_in_enabled=True,
|
|
audio_out_enabled=True,
|
|
audio_in_sample_rate=audio_config.transport_in_sample_rate,
|
|
audio_out_sample_rate=audio_config.transport_out_sample_rate,
|
|
audio_out_mixer=(
|
|
SoundfileMixer(
|
|
sound_files={
|
|
"office": APP_ROOT_DIR
|
|
/ "assets"
|
|
/ f"office-ambience-{audio_config.transport_out_sample_rate}-mono.wav"
|
|
},
|
|
default_sound="office",
|
|
volume=ambient_noise_config.get("volume", 0.3),
|
|
)
|
|
if ambient_noise_config and ambient_noise_config.get("enabled", False)
|
|
else SilenceAudioMixer()
|
|
),
|
|
serializer=serializer,
|
|
),
|
|
)
|
|
|
|
logger.info(
|
|
f"[run {workflow_run_id}] Vobiz transport created successfully (VAD enabled)"
|
|
)
|
|
return transport
|
|
|
|
|
|
def create_webrtc_transport(
|
|
webrtc_connection: SmallWebRTCConnection,
|
|
workflow_run_id: int,
|
|
audio_config: AudioConfig,
|
|
vad_config: dict | None = None,
|
|
ambient_noise_config: dict | None = None,
|
|
):
|
|
"""Create a transport for WebRTC connections"""
|
|
|
|
return SmallWebRTCTransport(
|
|
webrtc_connection=webrtc_connection,
|
|
params=TransportParams(
|
|
audio_in_enabled=True,
|
|
audio_out_enabled=True,
|
|
audio_in_sample_rate=audio_config.transport_in_sample_rate,
|
|
audio_out_sample_rate=audio_config.transport_out_sample_rate,
|
|
audio_out_mixer=(
|
|
SoundfileMixer(
|
|
sound_files={
|
|
"office": APP_ROOT_DIR
|
|
/ "assets"
|
|
/ f"office-ambience-{audio_config.transport_out_sample_rate}-mono.wav"
|
|
},
|
|
default_sound="office",
|
|
volume=ambient_noise_config.get("volume", 0.3),
|
|
)
|
|
if ambient_noise_config and ambient_noise_config.get("enabled", False)
|
|
else SilenceAudioMixer()
|
|
),
|
|
),
|
|
)
|
|
|
|
|
|
def create_internal_transport(
|
|
workflow_run_id: int,
|
|
audio_config: AudioConfig,
|
|
latency_seconds: float = 0.0,
|
|
vad_config: dict | None = None,
|
|
ambient_noise_config: dict | None = None,
|
|
):
|
|
"""Create an internal transport for agent-to-agent connections (LoopTalk).
|
|
|
|
Args:
|
|
workflow_run_id: ID of the workflow run for turn analyzer context
|
|
audio_config: Audio configuration for the transport
|
|
latency_seconds: Network latency to simulate
|
|
|
|
Returns:
|
|
InternalTransport instance configured with turn analyzer
|
|
"""
|
|
pass
|
|
# Commented out because looptalk coming in the regular import flow
|
|
# was causing issue. May be move this to looptalk/orchestrator.py
|
|
|
|
# Create and return the internal transport with latency
|
|
# return InternalTransport(
|
|
# params=TransportParams(
|
|
# audio_out_enabled=True,
|
|
# audio_out_sample_rate=audio_config.transport_out_sample_rate,
|
|
# audio_out_channels=1,
|
|
# audio_in_enabled=True,
|
|
# audio_in_sample_rate=audio_config.transport_in_sample_rate,
|
|
# audio_in_channels=1,
|
|
# audio_out_mixer=(
|
|
# SoundfileMixer(
|
|
# sound_files={
|
|
# "office": APP_ROOT_DIR
|
|
# / "assets"
|
|
# / f"office-ambience-{audio_config.transport_out_sample_rate}-mono.wav"
|
|
# },
|
|
# default_sound="office",
|
|
# volume=ambient_noise_config.get("volume", 0.3),
|
|
# )
|
|
# if ambient_noise_config and ambient_noise_config.get("enabled", False)
|
|
# else SilenceAudioMixer()
|
|
# ),
|
|
# ),
|
|
# latency_seconds=latency_seconds,
|
|
# )
|