diff --git a/api/routes/telephony.py b/api/routes/telephony.py index cc7ce55..7a24c3a 100644 --- a/api/routes/telephony.py +++ b/api/routes/telephony.py @@ -6,8 +6,7 @@ Consolidated from split modules for easier maintenance. import json import uuid from datetime import UTC, datetime -from typing import Optional - +from typing import Optional, Any, Dict from fastapi import ( APIRouter, Depends, @@ -49,6 +48,7 @@ from api.utils.telephony_helper import ( normalize_webhook_data, numbers_match, parse_webhook_request, + parse_cloudonix_amd_callback ) from pipecat.utils.run_context import set_current_run_id @@ -1182,13 +1182,12 @@ async def handle_cloudonix_status_callback( logger.warning(f"Workflow run {workflow_run_id} not found for status callback") return {"status": "ignored", "reason": "workflow_run_not_found"} - # Get workflow and provider - workflow = await db_client.get_workflow_by_id(workflow_run.workflow_id) - if not workflow: + # Get workflow from workflow_run and get the provider + if not workflow_run.workflow: logger.warning(f"Workflow {workflow_run.workflow_id} not found") return {"status": "ignored", "reason": "workflow_not_found"} - provider = await get_telephony_provider(workflow.organization_id) + provider = await get_telephony_provider(workflow_run.workflow.organization_id) # Parse the callback data into generic format parsed_data = provider.parse_status_callback(callback_data) @@ -1209,6 +1208,71 @@ async def handle_cloudonix_status_callback( return {"status": "success"} +@router.post("/cloudonix/amd-callback/{workflow_run_id}") +async def handle_cloudonix_amd_callback( + workflow_run_id: int, + request: Request, +): + """Handle Cloudonix-specific status callbacks. + + Cloudonix sends call status updates to the callback URL specified during call initiation. + """ + set_current_run_id(workflow_run_id) + # Parse callback data - determine if JSON or form data + content_type = request.headers.get("content-type", "") + + if "application/json" in content_type: + callback_data = await request.json() + else: + # Assume form data (like Twilio) + form_data = await request.form() + callback_data = dict(form_data) + + logger.info( + f"[run {workflow_run_id}] Received Cloudonix AMD status callback: {json.dumps(callback_data)}" + ) + + call_id = callback_data["CallSid"] + answered_by = callback_data["AnsweredBy"] + logger.info( + f"[run {workflow_run_id}] returning from the AMD callback for call answered-by: {answered_by}" +) + + if answered_by in ["human", "unknown"]: + logger.info( + f"[run {workflow_run_id}] returning from the AMD callback for call answered-by: {answered_by}" +) + return {"status": answered_by} + + logger.info( + f"[run {workflow_run_id}] proceeding with call hang up and workflow run update for call answered-by: {answered_by}" + ) + workflow_run = await db_client.get_workflow_run_by_call_id(call_id) + if not workflow_run: + logger.warning(f"No workflow run found for Cloudonix call_id: {call_id}") + return {"status": "ignored", "reason": "workflow_run_not_found"} + organization_id = workflow_run.workflow.organization_id + provider = await get_telephony_provider(organization_id) + + # Parse the AMD callback data to persist in workflow_run + parsed_data = parse_cloudonix_amd_callback(callback_data) + + # Create status update for AMD detection + status_update = StatusCallbackRequest( + call_id=parsed_data["call_id"], + status=f"amd_{parsed_data['answered_by']}", + extra_data=parsed_data, + ) + + # Process the status update + await _process_status_update(workflow_run_id, status_update) + + logger.info( + f"[run {workflow_run_id}] Call answered by: {answered_by}, hang up call with call_id: {call_id}" + ) + await provider.hang_up(call_id) + return {"status": answered_by} + @router.post("/vobiz/hangup-callback/workflow/{workflow_id}") async def handle_vobiz_hangup_callback_by_workflow( diff --git a/api/services/telephony/base.py b/api/services/telephony/base.py index a154946..b746984 100644 --- a/api/services/telephony/base.py +++ b/api/services/telephony/base.py @@ -352,3 +352,17 @@ class TelephonyProvider(ABC): True if provider supports call transfers, False otherwise """ pass + + @abstractmethod + async def hang_up(self, call_id: str, **kwargs: Any) -> Dict[str, Any]: + """ + Terminate an active call. + + Args: + call_id: Provider-specific call identifier + **kwargs: Provider-specific additional parameters + + Returns: + Dict containing hangup response (format varies by provider) + """ + pass diff --git a/api/services/telephony/providers/ari_provider.py b/api/services/telephony/providers/ari_provider.py index 139065a..6bbb4b4 100644 --- a/api/services/telephony/providers/ari_provider.py +++ b/api/services/telephony/providers/ari_provider.py @@ -418,3 +418,8 @@ class ARIProvider(TelephonyProvider): f"&app={self.app_name}" f"&subscribeAll=true" ) + + async def hang_up(self, call_id: str, **kwargs: Any) -> Dict[str, Any]: + """Terminate an ARI call.""" + # TODO: Implement ARI call termination + return {"status": "not_implemented"} diff --git a/api/services/telephony/providers/cloudonix_provider.py b/api/services/telephony/providers/cloudonix_provider.py index 9269d7e..f01181e 100644 --- a/api/services/telephony/providers/cloudonix_provider.py +++ b/api/services/telephony/providers/cloudonix_provider.py @@ -106,6 +106,16 @@ class CloudonixProvider(TelephonyProvider): "caller-id": from_number, # Required field } + # Enable and process AMD + data["machineDetection"] = "DetectMessageEnd" + data["asyncAmd"] = True + data["machineDetectionTimeout"] = 30 + data["machineDetectionSpeechThreshold"] = 2500 + data["machineDetectionSpeechEndThreshold"] = 6000 + data["machineDetectionSilenceTimeout"] = 2500 + data["asyncAmdStatusCallback"] = f"{backend_endpoint}/api/v1/telephony/cloudonix/amd-callback/{workflow_run_id}" + data["asyncAmdStatusCallbackMethod"]= "POST" + # TODO: Cloudonix status callbacks are spammy, so commenting it out. Can send it to # some persistent logging system instead of transcational database. # Add status callback if workflow_run_id provided @@ -682,6 +692,42 @@ class CloudonixProvider(TelephonyProvider): return Response(content=twiml, media_type="application/xml"), "application/xml" + async def hang_up(self, call_id: str, **kwargs: Any) -> Dict[str, Any]: + + # Construct the DELETE session endpoint + # Using "self" as customer-id as per Cloudonix documentation + endpoint = ( + f"{self.base_url}/customers/self/domains/{self.domain_id}/sessions/{call_id}" + ) + + # Prepare headers with Bearer token authentication + headers = { + "Authorization": f"Bearer {self.bearer_token}", + "Content-Type": "application/json", + } + + logger.info(f"Terminating Cloudonix call {call_id} via DELETE {endpoint}") + + # Make the DELETE request to terminate the session + async with aiohttp.ClientSession() as session: + async with session.delete(endpoint, headers=headers) as response: + status = response.status + response_text = await response.text() + + if status in (200, 204, 404): + # 200/204: Success + # 404: Session already terminated (acceptable) + logger.info( + f"Successfully terminated Cloudonix session {call_id} " + f"(HTTP {status}), Response: {response_text}" + ) + else: + logger.warning( + f"Unexpected response terminating Cloudonix session {call_id}: " + f"HTTP {status}, Response: {response_text}" + ) + return {"status": "success"} + # ======== CALL TRANSFER METHODS ======== async def transfer_call( diff --git a/api/services/telephony/providers/twilio_provider.py b/api/services/telephony/providers/twilio_provider.py index 764227e..1c4c148 100644 --- a/api/services/telephony/providers/twilio_provider.py +++ b/api/services/telephony/providers/twilio_provider.py @@ -587,3 +587,8 @@ class TwilioProvider(TelephonyProvider): True - Twilio provider supports call transfers """ return True + + async def hang_up(self, call_id: str, **kwargs: Any) -> Dict[str, Any]: + """Terminate a Twilio call.""" + # TODO: Implement Twilio call termination + return {"status": "not_implemented"} diff --git a/api/services/telephony/providers/vobiz_provider.py b/api/services/telephony/providers/vobiz_provider.py index 7e91bed..4ed4dd6 100644 --- a/api/services/telephony/providers/vobiz_provider.py +++ b/api/services/telephony/providers/vobiz_provider.py @@ -560,3 +560,8 @@ class VobizProvider(TelephonyProvider): False - Vobiz provider does not support call transfers """ return False + + async def hang_up(self, call_id: str, **kwargs: Any) -> Dict[str, Any]: + """Terminate a Vobiz call.""" + # TODO: Implement Vobiz call termination + return {"status": "not_implemented"} diff --git a/api/services/telephony/providers/vonage_provider.py b/api/services/telephony/providers/vonage_provider.py index 357d5b4..a6fe3fd 100644 --- a/api/services/telephony/providers/vonage_provider.py +++ b/api/services/telephony/providers/vonage_provider.py @@ -511,3 +511,8 @@ class VonageProvider(TelephonyProvider): False - Vonage provider does not support call transfers """ return False + + async def hang_up(self, call_id: str, **kwargs: Any) -> Dict[str, Any]: + """Terminate a Vonage call.""" + # TODO: Implement Vonage call termination + return {"status": "not_implemented"} diff --git a/api/utils/telephony_helper.py b/api/utils/telephony_helper.py index ee51e46..43dd906 100644 --- a/api/utils/telephony_helper.py +++ b/api/utils/telephony_helper.py @@ -231,3 +231,30 @@ def get_countries_for_code(dialing_code: str) -> list[str]: return [] return [country for country, code in COUNTRY_CODES.items() if code == dialing_code] + + +def parse_cloudonix_amd_callback(data: dict) -> dict: + """ + Parse Cloudonix AMD callback data into generic format. + + Note: This is Cloudonix-specific and not part of the generic provider interface + as AMD callbacks are currently only supported by Cloudonix. + + Args: + data: Raw AMD callback data from Cloudonix + + Returns: + Dict with parsed AMD information + """ + return { + "call_id": data.get("CallSid", ""), + "session": data.get("Session", ""), + "answered_by": data.get("AnsweredBy", ""), + "from_number": data.get("From", ""), + "to_number": data.get("To", ""), + "call_status": data.get("CallStatus", ""), + "domain": data.get("Domain", ""), + "direction": data.get("Direction", ""), + "account_sid": data.get("AccountSid", ""), + "api_version": data.get("ApiVersion", "") + } diff --git a/docker-compose.yaml b/docker-compose.yaml index 5e7ce32..2ad8dd6 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -59,155 +59,155 @@ services: networks: - app-network - nginx: - image: nginx:alpine - container_name: nginx_https - profiles: ["remote"] - depends_on: - - ui - ports: - - "80:80" - - "443:443" - volumes: - - ./nginx.conf:/etc/nginx/conf.d/default.conf:ro - - ./certs:/etc/nginx/certs:ro - networks: - - app-network + # nginx: + # image: nginx:alpine + # container_name: nginx_https + # profiles: ["remote"] + # depends_on: + # - ui + # ports: + # - "80:80" + # - "443:443" + # volumes: + # - ./nginx.conf:/etc/nginx/conf.d/default.conf:ro + # - ./certs:/etc/nginx/certs:ro + # networks: + # - app-network - api: - image: ${REGISTRY:-dograhai}/dograh-api:latest - volumes: - - shared-tmp:/tmp - environment: - # Core application config - ENVIRONMENT: "local" - LOG_LEVEL: "INFO" + # api: + # image: ${REGISTRY:-dograhai}/dograh-api:latest + # volumes: + # - shared-tmp:/tmp + # environment: + # # Core application config + # ENVIRONMENT: "local" + # LOG_LEVEL: "INFO" - # Replace this environment variable if you are using a custom - # domain to host the stack - BACKEND_API_ENDPOINT: "${BACKEND_API_ENDPOINT:-http://localhost:8000}" + # # Replace this environment variable if you are using a custom + # # domain to host the stack + # BACKEND_API_ENDPOINT: "${BACKEND_API_ENDPOINT:-http://localhost:8000}" - # Database configuration (using containerized postgres) - DATABASE_URL: "postgresql+asyncpg://postgres:postgres@postgres:5432/postgres" + # # Database configuration (using containerized postgres) + # DATABASE_URL: "postgresql+asyncpg://postgres:postgres@postgres:5432/postgres" - # Redis configuration (using containerized redis) - REDIS_URL: "redis://:redissecret@redis:6379" + # # Redis configuration (using containerized redis) + # REDIS_URL: "redis://:redissecret@redis:6379" - # Storage configuration - using local MinIO - ENABLE_AWS_S3: "false" + # # Storage configuration - using local MinIO + # ENABLE_AWS_S3: "false" - # MinIO - MINIO_ENDPOINT: "minio:9000" - MINIO_ACCESS_KEY: "minioadmin" - MINIO_SECRET_KEY: "minioadmin" - MINIO_BUCKET: "voice-audio" - MINIO_SECURE: "false" + # # MinIO + # MINIO_ENDPOINT: "minio:9000" + # MINIO_ACCESS_KEY: "minioadmin" + # MINIO_SECRET_KEY: "minioadmin" + # MINIO_BUCKET: "voice-audio" + # MINIO_SECURE: "false" - # FastAPI workers count - FASTAPI_WORKERS: 1 + # # FastAPI workers count + # FASTAPI_WORKERS: 1 - # Langfuse - ENABLE_TRACING: "false" - # LANGFUSE_SECRET_KEY: "" - # LANGFUSE_PUBLIC_KEY: "" - # LANGFUSE_HOST: "" + # # Langfuse + # ENABLE_TRACING: "false" + # # LANGFUSE_SECRET_KEY: "" + # # LANGFUSE_PUBLIC_KEY: "" + # # LANGFUSE_HOST: "" - # TURN server configuration (for WebRTC NAT traversal in remote server) - # Uses time-limited credentials via TURN REST API (HMAC-SHA1) - TURN_HOST: "${TURN_HOST:-}" - TURN_SECRET: "${TURN_SECRET:-}" + # # TURN server configuration (for WebRTC NAT traversal in remote server) + # # Uses time-limited credentials via TURN REST API (HMAC-SHA1) + # TURN_HOST: "${TURN_HOST:-}" + # TURN_SECRET: "${TURN_SECRET:-}" - OSS_JWT_SECRET: "${OSS_JWT_SECRET:-ChangeMeInProduction}" + # OSS_JWT_SECRET: "${OSS_JWT_SECRET:-ChangeMeInProduction}" - ports: - - "8000:8000" - depends_on: - postgres: - condition: service_healthy - redis: - condition: service_healthy - minio: - condition: service_healthy - cloudflared: - condition: service_started - healthcheck: - test: - [ - "CMD-SHELL", - 'python -c "import urllib.request; urllib.request.urlopen(''http://localhost:8000/api/v1/health'').read()"', - ] - interval: 30s - timeout: 10s - retries: 3 - start_period: 60s - networks: - - app-network + # ports: + # - "8000:8000" + # depends_on: + # postgres: + # condition: service_healthy + # redis: + # condition: service_healthy + # minio: + # condition: service_healthy + # cloudflared: + # condition: service_started + # healthcheck: + # test: + # [ + # "CMD-SHELL", + # 'python -c "import urllib.request; urllib.request.urlopen(''http://localhost:8000/api/v1/health'').read()"', + # ] + # interval: 30s + # timeout: 10s + # retries: 3 + # start_period: 60s + # networks: + # - app-network - ui: - image: ${REGISTRY:-dograhai}/dograh-ui:latest - environment: - # Server-side URL (SSR, internal Docker network) - BACKEND_URL: "${BACKEND_URL:-http://api:8000}" - NODE_ENV: "oss" - # Flag to enable/ disable posthog - ENABLE_TELEMETRY: "${ENABLE_TELEMETRY:-true}" + # ui: + # image: ${REGISTRY:-dograhai}/dograh-ui:latest + # environment: + # # Server-side URL (SSR, internal Docker network) + # BACKEND_URL: "${BACKEND_URL:-http://api:8000}" + # NODE_ENV: "oss" + # # Flag to enable/ disable posthog + # ENABLE_TELEMETRY: "${ENABLE_TELEMETRY:-true}" - # Posthog - POSTHOG_KEY: "phc_ItizB1dP6yv7ZYobbcqrpxTdbomDA8hJFSEmAMdYvIr" - POSTHOG_HOST: "https://us.posthog.com" - ports: - - "3010:3010" - depends_on: - api: - condition: service_healthy - healthcheck: - test: - [ - "CMD-SHELL", - "wget --no-verbose --tries=1 --spider http://localhost:3010 || exit 1", - ] - interval: 30s - timeout: 10s - retries: 3 - start_period: 30s - networks: - - app-network + # # Posthog + # POSTHOG_KEY: "phc_ItizB1dP6yv7ZYobbcqrpxTdbomDA8hJFSEmAMdYvIr" + # POSTHOG_HOST: "https://us.posthog.com" + # ports: + # - "3010:3010" + # depends_on: + # api: + # condition: service_healthy + # healthcheck: + # test: + # [ + # "CMD-SHELL", + # "wget --no-verbose --tries=1 --spider http://localhost:3010 || exit 1", + # ] + # interval: 30s + # timeout: 10s + # retries: 3 + # start_period: 30s + # networks: + # - app-network - cloudflared: - image: cloudflare/cloudflared:latest - container_name: cloudflared-tunnel - command: tunnel --no-autoupdate --url http://api:8000 --metrics 0.0.0.0:2000 - ports: - - "2000:2000" # Expose metrics endpoint - networks: - - app-network + # cloudflared: + # image: cloudflare/cloudflared:latest + # container_name: cloudflared-tunnel + # command: tunnel --no-autoupdate --url http://api:8000 --metrics 0.0.0.0:2000 + # ports: + # - "2000:2000" # Expose metrics endpoint + # networks: + # - app-network - coturn: - image: coturn/coturn:4.8.0 - container_name: coturn - restart: unless-stopped - profiles: ["remote"] - ports: - - "3478:3478/udp" - - "3478:3478/tcp" - - "5349:5349/udp" - - "5349:5349/tcp" - - "49152-49200:49152-49200/udp" - volumes: - - ./turnserver.conf:/etc/coturn/turnserver.conf:ro - command: - - -c - - /etc/coturn/turnserver.conf - networks: - - app-network + # coturn: + # image: coturn/coturn:4.8.0 + # container_name: coturn + # restart: unless-stopped + # profiles: ["remote"] + # ports: + # - "3478:3478/udp" + # - "3478:3478/tcp" + # - "5349:5349/udp" + # - "5349:5349/tcp" + # - "49152-49200:49152-49200/udp" + # volumes: + # - ./turnserver.conf:/etc/coturn/turnserver.conf:ro + # command: + # - -c + # - /etc/coturn/turnserver.conf + # networks: + # - app-network volumes: postgres_data: redis_data: minio-data: driver: local - shared-tmp: - driver: local + # shared-tmp: + # driver: local networks: app-network: diff --git a/pipecat b/pipecat index 6aa0502..3559c6a 160000 --- a/pipecat +++ b/pipecat @@ -1 +1 @@ -Subproject commit 6aa0502a9834d536aba9589cec87d827e66f2fad +Subproject commit 3559c6a8eb9cc82b400d745ab62ed037a420b42f diff --git a/ui/package-lock.json b/ui/package-lock.json index 2862dc7..cc11b0f 100644 --- a/ui/package-lock.json +++ b/ui/package-lock.json @@ -1,12 +1,12 @@ { "name": "ui", - "version": "1.13.0", + "version": "1.15.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "ui", - "version": "1.13.0", + "version": "1.15.0", "dependencies": { "@dagrejs/dagre": "^1.1.4", "@hey-api/client-fetch": "^0.10.0",