fix: docker compose, add missing files from merge conflicts

This commit is contained in:
Sabiha Khan 2026-02-26 19:09:37 +05:30
parent 99ebede3e3
commit 050c645121
6 changed files with 531 additions and 94 deletions

View file

@ -276,7 +276,7 @@ class ARIConnection:
else:
# Outbound call (state == "Up") — originated by us
# Check if this is a transfer destination channel (app_args starts with "transfer")
# Transfer destinations run externally - we only track status to publish transfer event, not run our pipeline
# Transfer destinations run externally - we only track status to publish transfer event, not run the pipeline
transfer_id = self._get_transfer_id(app_args)
if transfer_id:
logger.info(
@ -318,7 +318,6 @@ class ARIConnection:
logger.info(
f"[ARI org={self.organization_id}] StasisEnd: channel={channel_id}"
)
workflow_run_id = await self._get_channel_run(channel_id)
if workflow_run_id:
asyncio.create_task(
@ -629,7 +628,8 @@ class ARIConnection:
bridge_id = ctx.get("bridge_id")
transfer_state = ctx.get("transfer_state")
# Check if this is transfer-protected external channe. Skip full teardown if transfer is in progress and this is the external media channel
# Check if this is a call transfer scenario external channel. Skip full teardown if
# transfer is in progress and this is the external media channel
# During call transfer, we preserve the caller-destination bridge
if (
transfer_state == "in-progress"
@ -811,7 +811,6 @@ class ARIConnection:
try:
logger.info(f"[ARI Transfer] Transfer {transfer_id} failed: {reason}")
# Get transfer context
transfer_manager = await self._get_transfer_manager()
context = await transfer_manager.get_transfer_context(transfer_id)

View file

@ -0,0 +1,253 @@
"""ARI-specific call operation strategies.
This module contains the business logic for Asterisk ARI call operations.
"""
from typing import Any, Dict
from loguru import logger
from pipecat.serializers.call_strategies import HangupStrategy, TransferStrategy
class ARIBridgeSwapStrategy(TransferStrategy):
"""Implements bridge swap transfer for Asterisk ARI.
This strategy handles transferring calls by swapping channels in existing
bridges, managing transfer contexts, and publishing
transfer completion events.
"""
async def execute_transfer(self, context: Dict[str, Any]) -> bool:
"""Execute bridge swap transfer for Asterisk ARI."""
try:
import aiohttp
import redis.asyncio as aioredis
from aiohttp import BasicAuth
channel_id = context["channel_id"]
ari_endpoint = context["ari_endpoint"]
app_name = context["app_name"]
app_password = context["app_password"]
if not channel_id or not ari_endpoint:
logger.warning(
"Cannot execute transfer: missing channel_id or ari_endpoint"
)
return False
logger.info(
f"[ARI Transfer] Executing bridge swap for channel {channel_id}"
)
from api.constants import REDIS_URL
from api.db import db_client
auth = BasicAuth(app_name, app_password)
# 1. Find active transfer context for this caller channel
transfer_context = await self._find_transfer_context_for_call(channel_id)
if not transfer_context:
logger.error(
f"[ARI Transfer] No active transfer context found for caller {channel_id}"
)
return False
logger.info(
f"[ARI Transfer] Found transfer context: {transfer_context.transfer_id}, "
f"destination: {transfer_context.call_sid}"
)
# 2. Get workflow run to find current bridge and external media channel
redis = aioredis.from_url(REDIS_URL, decode_responses=True)
workflow_run_id = await redis.get(f"ari:channel:{channel_id}")
if not workflow_run_id:
logger.error(
f"[ARI Transfer] No workflow run found for caller {channel_id}"
)
return False
workflow_run = await db_client.get_workflow_run_by_id(int(workflow_run_id))
if not workflow_run or not workflow_run.gathered_context:
logger.error(
f"[ARI Transfer] No workflow context found for run {workflow_run_id}"
)
return False
ctx = workflow_run.gathered_context
bridge_id = ctx.get("bridge_id")
ext_channel_id = ctx.get("ext_channel_id")
if not bridge_id or not ext_channel_id:
logger.error(
f"[ARI Transfer] Missing bridge/external channel info: {ctx}"
)
return False
destination_channel_id = transfer_context.call_sid
if not destination_channel_id:
logger.error(
f"[ARI Transfer] No destination channel in transfer context"
)
return False
logger.info(
f"[ARI Transfer] Bridge swap: bridge={bridge_id}, caller={channel_id}, "
f"destination={destination_channel_id}, ext_media={ext_channel_id}"
)
# 3. Set transfer state to prevent StasisEnd auto-teardown
workflow_run.gathered_context["transfer_state"] = "in-progress"
await db_client.update_workflow_run(
run_id=int(workflow_run_id),
gathered_context=workflow_run.gathered_context,
)
logger.debug(
f"[ARI Transfer] Set transfer_state=in-progress for workflow {workflow_run_id}"
)
# 4. Execute bridge swap operations via ARI REST API
async with aiohttp.ClientSession() as session:
# Add destination channel to existing bridge
add_url = f"{ari_endpoint}/ari/bridges/{bridge_id}/addChannel"
async with session.post(
add_url, auth=auth, params={"channel": destination_channel_id}
) as response:
if response.status in (200, 204):
logger.info(
f"[ARI Transfer] Added destination {destination_channel_id} to bridge {bridge_id}"
)
else:
error_text = await response.text()
logger.error(
f"[ARI Transfer] Failed to add destination to bridge: {response.status} {error_text}"
)
return False
# Remove external media channel from bridge
remove_url = f"{ari_endpoint}/ari/bridges/{bridge_id}/removeChannel"
async with session.post(
remove_url, auth=auth, params={"channel": ext_channel_id}
) as response:
if response.status in (200, 204):
logger.info(
f"[ARI Transfer] Removed external media {ext_channel_id} from bridge {bridge_id}"
)
else:
error_text = await response.text()
logger.error(
f"[ARI Transfer] Failed to remove external media from bridge: {response.status} {error_text}"
)
# Hang up the external media channel
hangup_url = f"{ari_endpoint}/ari/channels/{ext_channel_id}"
async with session.delete(hangup_url, auth=auth) as response:
if response.status in (200, 204):
logger.info(
f"[ARI Transfer] Hung up external media channel {ext_channel_id}"
)
elif response.status == 404:
logger.debug(
f"[ARI Transfer] External media channel {ext_channel_id} already gone"
)
else:
error_text = await response.text()
logger.warning(
f"[ARI Transfer] Failed to hang up external media: {response.status} {error_text}"
)
logger.info(
f"[ARI Transfer] Bridge swap completed successfully for transfer {transfer_context.transfer_id}, "
f"caller {channel_id} connected to destination {destination_channel_id} via bridge {bridge_id}"
)
# 5. Clean up transfer context after successful completion
from api.services.telephony.call_transfer_manager import (
get_call_transfer_manager,
)
call_transfer_manager = await get_call_transfer_manager()
await call_transfer_manager.remove_transfer_context(
transfer_context.transfer_id
)
return True
except Exception as e:
logger.exception(f"Failed to execute ARI transfer: {e}")
return False
async def _find_transfer_context_for_call(self, caller_channel_id: str):
"""Find the active transfer context for this caller channel."""
try:
import redis.asyncio as aioredis
from api.constants import REDIS_URL
from api.services.telephony.transfer_event_protocol import TransferContext
# Search Redis for transfer contexts where original_call_sid matches this caller
redis = aioredis.from_url(REDIS_URL, decode_responses=True)
transfer_keys = await redis.keys("transfer:context:*")
for key in transfer_keys:
try:
context_data = await redis.get(key)
if context_data:
context = TransferContext.from_json(context_data)
if context.original_call_sid == caller_channel_id:
return context
except Exception:
continue
return None
except Exception as e:
logger.error(f"[ARI Transfer] Error finding transfer context: {e}")
return None
class ARIHangupStrategy(HangupStrategy):
"""Implements hangup for Asterisk ARI channels."""
async def execute_hangup(self, context: Dict[str, Any]) -> bool:
"""Hang up the Asterisk channel via ARI REST API."""
try:
import aiohttp
from aiohttp import BasicAuth
channel_id = context["channel_id"]
ari_endpoint = context["ari_endpoint"]
app_name = context["app_name"]
app_password = context["app_password"]
if not channel_id or not ari_endpoint:
logger.warning(
"Cannot hang up Asterisk channel: missing channel_id or ari_endpoint"
)
return False
endpoint = f"{ari_endpoint}/ari/channels/{channel_id}"
auth = BasicAuth(app_name, app_password)
async with aiohttp.ClientSession() as session:
async with session.delete(endpoint, auth=auth) as response:
if response.status in (200, 204):
logger.info(
f"Successfully terminated Asterisk channel {channel_id}"
)
return True
elif response.status == 404:
logger.debug(
f"Asterisk channel {channel_id} was already terminated"
)
return True
else:
error_text = await response.text()
logger.error(
f"Failed to terminate Asterisk channel {channel_id}: "
f"Status {response.status}, Response: {error_text}"
)
return False
except Exception as e:
logger.exception(f"Failed to hang up Asterisk channel: {e}")
return False

View file

@ -388,7 +388,6 @@ class ARIProvider(TelephonyProvider):
f"(timeout: {timeout}s)"
)
# Import here to avoid circular dependency
from api.services.telephony.call_transfer_manager import (
get_call_transfer_manager,
)
@ -456,7 +455,7 @@ class ARIProvider(TelephonyProvider):
}
except Exception as e:
logger.error(f"[ARI Transfer] Failed to originate transfer channel: {e}")
logger.error(f"[ARI Transfer] Failed to originate call transfer destination channel: {e}")
await call_transfer_manager.remove_transfer_context(transfer_id)
raise

View file

@ -0,0 +1,186 @@
"""Twilio-specific call operation strategies.
This module contains the business logic for Twilio call operations,
maintaining proper separation of concerns between protocol handling and business logic.
"""
from typing import Any, Dict
import aiohttp
from loguru import logger
from pipecat.serializers.call_strategies import HangupStrategy, TransferStrategy
class TwilioConferenceStrategy(TransferStrategy):
"""Implements conference-based call transfer for Twilio.
This strategy transfers calls by placing them into a Twilio conference,
with cleanup of transfer contexts upon successful completion.
"""
async def execute_transfer(self, context: Dict[str, Any]) -> bool:
"""Execute conference transfer for Twilio call."""
try:
account_sid = context["account_sid"]
auth_token = context["auth_token"]
call_sid = context["call_sid"]
region = context.get("region")
edge = context.get("edge")
# 1. Find active transfer context for this call
transfer_context = await self._find_transfer_context_for_call(call_sid)
if not transfer_context:
logger.error(
f"[Twilio Transfer] No active transfer context found for call {call_sid}"
)
return False
logger.info(
f"[Twilio Transfer] Found transfer context: {transfer_context.transfer_id}, "
f"original: {transfer_context.original_call_sid}"
)
region_prefix = f"{region}." if region else ""
edge_prefix = f"{edge}." if edge else ""
# Twilio API endpoint for updating calls
endpoint = f"https://api.{edge_prefix}{region_prefix}twilio.com/2010-04-01/Accounts/{account_sid}/Calls/{call_sid}.json"
# Create basic auth from account_sid and auth_token
auth = aiohttp.BasicAuth(account_sid, auth_token)
conference_name = transfer_context.conference_name
twiml = f"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Dial>
<Conference endConferenceOnExit="true">{conference_name}</Conference>
</Dial>
</Response>"""
logger.debug(
f"[Twilio Transfer] Transferring call to conference: {conference_name}"
)
# 2. Make the POST request to transfer the call
async with aiohttp.ClientSession() as session:
async with session.post(
endpoint, auth=auth, data={"Twiml": twiml}
) as response:
response_text = await response.text()
if response.status == 200:
logger.info(
f"[Twilio Transfer] Conference transfer completed successfully for call {call_sid}, "
f"joined conference {conference_name}"
)
# 3. Clean up transfer context after successful transfer
await self._cleanup_transfer_context(transfer_context.transfer_id)
return True
elif response.status == 404:
logger.error(
f"Failed to transfer Twilio call {call_sid}: Call not found (404)"
)
await self._cleanup_transfer_context(transfer_context.transfer_id)
return False
else:
logger.error(
f"Failed to transfer Twilio call {call_sid} to conference {conference_name}: "
f"Status {response.status}, Response: {response_text}"
)
await self._cleanup_transfer_context(transfer_context.transfer_id)
return False
except Exception as e:
logger.error(f"Failed to transfer Twilio call: {e}")
if transfer_context:
await self._cleanup_transfer_context(transfer_context.transfer_id)
return False
async def _find_transfer_context_for_call(self, call_sid: str):
"""Find the active transfer context for this call."""
try:
import redis.asyncio as aioredis
from api.constants import REDIS_URL
from api.services.telephony.transfer_event_protocol import TransferContext
# Search Redis for transfer contexts where original_call_sid matches
redis = aioredis.from_url(REDIS_URL, decode_responses=True)
transfer_keys = await redis.keys("transfer:context:*")
for key in transfer_keys:
try:
context_data = await redis.get(key)
if context_data:
context = TransferContext.from_json(context_data)
if context.original_call_sid == call_sid:
return context
except Exception:
continue
return None
except Exception as e:
logger.error(f"[Twilio Transfer] Error finding transfer context: {e}")
return None
async def _cleanup_transfer_context(self, transfer_id: str):
"""Clean up transfer context after completion or failure."""
try:
from api.services.telephony.call_transfer_manager import (
get_call_transfer_manager,
)
call_transfer_manager = await get_call_transfer_manager()
await call_transfer_manager.remove_transfer_context(transfer_id)
except Exception as e:
logger.error(f"[Twilio Transfer] Error cleaning up transfer context: {e}")
class TwilioHangupStrategy(HangupStrategy):
"""Implements hangup for Twilio calls."""
async def execute_hangup(self, context: Dict[str, Any]) -> bool:
"""Hang up the Twilio call via REST API."""
try:
account_sid = context["account_sid"]
auth_token = context["auth_token"]
call_sid = context["call_sid"]
region = context.get("region")
edge = context.get("edge")
if not account_sid or not auth_token or not call_sid:
logger.warning(
"Cannot hang up Twilio call: missing required credentials or call_sid"
)
return False
region_prefix = f"{region}." if region else ""
edge_prefix = f"{edge}." if edge else ""
endpoint = f"https://api.{edge_prefix}{region_prefix}twilio.com/2010-04-01/Accounts/{account_sid}/Calls/{call_sid}.json"
auth = aiohttp.BasicAuth(account_sid, auth_token)
async with aiohttp.ClientSession() as session:
async with session.post(
endpoint, auth=auth, data={"Status": "completed"}
) as response:
if response.status == 200:
logger.info(f"Successfully terminated Twilio call {call_sid}")
return True
elif response.status == 404:
logger.debug(f"Twilio call {call_sid} was already terminated")
return True
else:
response_text = await response.text()
logger.error(
f"Failed to terminate Twilio call {call_sid}: "
f"Status {response.status}, Response: {response_text}"
)
return False
except Exception as e:
logger.exception(f"Failed to hang up Twilio call: {e}")
return False

View file

@ -488,7 +488,7 @@ class CustomToolManager:
finally:
# Cleanup hold music and pipeline state
# Transfer context cleanup is now handled by respective serializers
# Transfer context cleanup is handled by respective transfer call strategies
logger.info(
"Transfer wait ended, cleaning up hold music and pipeline state"
)

View file

@ -59,52 +59,52 @@ services:
networks:
- app-network
# nginx:
# image: nginx:alpine
# container_name: nginx_https
# profiles: ["remote"]
# depends_on:
# - ui
# ports:
# - "80:80"
# - "443:443"
# volumes:
# - ./nginx.conf:/etc/nginx/conf.d/default.conf:ro
# - ./certs:/etc/nginx/certs:ro
# networks:
# - app-network
nginx:
image: nginx:alpine
container_name: nginx_https
profiles: ["remote"]
depends_on:
- ui
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/conf.d/default.conf:ro
- ./certs:/etc/nginx/certs:ro
networks:
- app-network
# api:
# image: ${REGISTRY:-dograhai}/dograh-api:latest
# volumes:
# - shared-tmp:/tmp
# environment:
# # Core application config
# ENVIRONMENT: "local"
# LOG_LEVEL: "INFO"
api:
image: ${REGISTRY:-dograhai}/dograh-api:latest
volumes:
- shared-tmp:/tmp
environment:
# Core application config
ENVIRONMENT: "local"
LOG_LEVEL: "INFO"
# # Replace this environment variable if you are using a custom
# # domain to host the stack
# BACKEND_API_ENDPOINT: "${BACKEND_API_ENDPOINT:-http://localhost:8000}"
# Replace this environment variable if you are using a custom
# domain to host the stack
BACKEND_API_ENDPOINT: "${BACKEND_API_ENDPOINT:-http://localhost:8000}"
# # Database configuration (using containerized postgres)
# DATABASE_URL: "postgresql+asyncpg://postgres:postgres@postgres:5432/postgres"
# Database configuration (using containerized postgres)
DATABASE_URL: "postgresql+asyncpg://postgres:postgres@postgres:5432/postgres"
# # Redis configuration (using containerized redis)
# REDIS_URL: "redis://:redissecret@redis:6379"
# Redis configuration (using containerized redis)
REDIS_URL: "redis://:redissecret@redis:6379"
# # Storage configuration - using local MinIO
# ENABLE_AWS_S3: "false"
# Storage configuration - using local MinIO
ENABLE_AWS_S3: "false"
# # MinIO
# MINIO_ENDPOINT: "minio:9000"
# MINIO_ACCESS_KEY: "minioadmin"
# MINIO_SECRET_KEY: "minioadmin"
# MINIO_BUCKET: "voice-audio"
# MINIO_SECURE: "false"
# MinIO
MINIO_ENDPOINT: "minio:9000"
MINIO_ACCESS_KEY: "minioadmin"
MINIO_SECRET_KEY: "minioadmin"
MINIO_BUCKET: "voice-audio"
MINIO_SECURE: "false"
# # FastAPI workers count
# FASTAPI_WORKERS: 1
# FastAPI workers count
FASTAPI_WORKERS: 1
# Langfuse
ENABLE_TRACING: "false"
@ -112,36 +112,36 @@ services:
# LANGFUSE_PUBLIC_KEY: ""
# LANGFUSE_HOST: ""
# # TURN server configuration (for WebRTC NAT traversal in remote server)
# # Uses time-limited credentials via TURN REST API (HMAC-SHA1)
# TURN_HOST: "${TURN_HOST:-}"
# TURN_SECRET: "${TURN_SECRET:-}"
# TURN server configuration (for WebRTC NAT traversal in remote server)
# Uses time-limited credentials via TURN REST API (HMAC-SHA1)
TURN_HOST: "${TURN_HOST:-}"
TURN_SECRET: "${TURN_SECRET:-}"
# OSS_JWT_SECRET: "${OSS_JWT_SECRET:-ChangeMeInProduction}"
OSS_JWT_SECRET: "${OSS_JWT_SECRET:-ChangeMeInProduction}"
# ports:
# - "8000:8000"
# depends_on:
# postgres:
# condition: service_healthy
# redis:
# condition: service_healthy
# minio:
# condition: service_healthy
# cloudflared:
# condition: service_started
# healthcheck:
# test:
# [
# "CMD-SHELL",
# 'python -c "import urllib.request; urllib.request.urlopen(''http://localhost:8000/api/v1/health'').read()"',
# ]
# interval: 30s
# timeout: 10s
# retries: 3
# start_period: 60s
# networks:
# - app-network
ports:
- "8000:8000"
depends_on:
postgres:
condition: service_healthy
redis:
condition: service_healthy
minio:
condition: service_healthy
cloudflared:
condition: service_started
healthcheck:
test:
[
"CMD-SHELL",
'python -c "import urllib.request; urllib.request.urlopen(''http://localhost:8000/api/v1/health'').read()"',
]
interval: 30s
timeout: 10s
retries: 3
start_period: 60s
networks:
- app-network
ui:
image: ${REGISTRY:-dograhai}/dograh-ui:latest
@ -182,32 +182,32 @@ services:
networks:
- app-network
# coturn:
# image: coturn/coturn:4.8.0
# container_name: coturn
# restart: unless-stopped
# profiles: ["remote"]
# ports:
# - "3478:3478/udp"
# - "3478:3478/tcp"
# - "5349:5349/udp"
# - "5349:5349/tcp"
# - "49152-49200:49152-49200/udp"
# volumes:
# - ./turnserver.conf:/etc/coturn/turnserver.conf:ro
# command:
# - -c
# - /etc/coturn/turnserver.conf
# networks:
# - app-network
coturn:
image: coturn/coturn:4.8.0
container_name: coturn
restart: unless-stopped
profiles: ["remote"]
ports:
- "3478:3478/udp"
- "3478:3478/tcp"
- "5349:5349/udp"
- "5349:5349/tcp"
- "49152-49200:49152-49200/udp"
volumes:
- ./turnserver.conf:/etc/coturn/turnserver.conf:ro
command:
- -c
- /etc/coturn/turnserver.conf
networks:
- app-network
volumes:
postgres_data:
redis_data:
minio-data:
driver: local
# shared-tmp:
# driver: local
shared-tmp:
driver: local
networks:
app-network: