From 1349654c75d43bc06bbaeadbab1989a808eeb10b Mon Sep 17 00:00:00 2001 From: Abhishek Kumar Date: Sun, 15 Feb 2026 12:45:58 +0530 Subject: [PATCH] chore: remove old files --- api/app.py | 37 +- api/enums.py | 2 +- api/routes/stasis_rtp.py | 45 -- api/services/pipecat/audio_config.py | 5 +- api/services/pipecat/run_pipeline.py | 53 -- api/services/pipecat/service_factory.py | 2 +- api/services/pipecat/transport_setup.py | 47 -- api/services/telephony/ari_client.py | 765 ------------------ api/services/telephony/ari_client_manager.py | 437 ---------- .../telephony/ari_client_singleton.py | 50 -- api/services/telephony/ari_manager.py | 749 ----------------- .../telephony/ari_manager_connection.py | 323 -------- .../telephony/stasis_event_protocol.py | 184 ----- api/services/telephony/stasis_rtp_client.py | 315 -------- .../telephony/stasis_rtp_connection.py | 191 ----- .../telephony/stasis_rtp_serializer.py | 116 --- .../telephony/stasis_rtp_transport.py | 300 ------- api/services/telephony/test_asyncari_ping.py | 105 --- api/services/telephony/test_real_ping.py | 83 -- .../telephony/worker_event_subscriber.py | 371 --------- api/services/workflow/pipecat_engine.py | 21 - pipecat | 2 +- ui/src/constants/workflowRunModes.ts | 1 - 23 files changed, 6 insertions(+), 4198 deletions(-) delete mode 100644 api/routes/stasis_rtp.py delete mode 100644 api/services/telephony/ari_client.py delete mode 100644 api/services/telephony/ari_client_manager.py delete mode 100644 api/services/telephony/ari_client_singleton.py delete mode 100644 api/services/telephony/ari_manager.py delete mode 100644 api/services/telephony/ari_manager_connection.py delete mode 100644 api/services/telephony/stasis_event_protocol.py delete mode 100644 api/services/telephony/stasis_rtp_client.py delete mode 100644 api/services/telephony/stasis_rtp_connection.py delete mode 100644 api/services/telephony/stasis_rtp_serializer.py delete mode 100644 api/services/telephony/stasis_rtp_transport.py delete mode 100644 api/services/telephony/test_asyncari_ping.py delete mode 100644 api/services/telephony/test_real_ping.py delete mode 100644 api/services/telephony/worker_event_subscriber.py diff --git a/api/app.py b/api/app.py index dbe0a0d..a0dacd3 100644 --- a/api/app.py +++ b/api/app.py @@ -2,7 +2,7 @@ import sentry_sdk -from api.constants import DEPLOYMENT_MODE, ENABLE_TELEMETRY, REDIS_URL, SENTRY_DSN +from api.constants import DEPLOYMENT_MODE, ENABLE_TELEMETRY, SENTRY_DSN from api.logging_config import ENVIRONMENT, setup_logging # Set up logging and get the listener for cleanup @@ -21,62 +21,27 @@ if SENTRY_DSN and ( from contextlib import asynccontextmanager -from typing import Optional -import redis.asyncio as aioredis from fastapi import APIRouter, FastAPI from fastapi.middleware.cors import CORSMiddleware from loguru import logger from api.routes.main import router as main_router -from api.services.telephony.worker_event_subscriber import ( - WorkerEventSubscriber, - setup_worker_subscriber, -) from api.tasks.arq import get_arq_redis API_PREFIX = "/api/v1" -# Global reference to worker subscriber for graceful shutdown -worker_subscriber_instance: Optional[WorkerEventSubscriber] = None - @asynccontextmanager async def lifespan(app: FastAPI): - global worker_subscriber_instance - # warmup arq pool await get_arq_redis() - # Setup Redis connection for distributed mode - redis = await aioredis.from_url(REDIS_URL, decode_responses=True) - - # Setup worker subscriber (ARI Manager runs separately) - worker_subscriber = await setup_worker_subscriber(redis) - worker_subscriber_instance = worker_subscriber - - # Store worker ID in app state for health check - app.state.worker_id = worker_subscriber.worker_id - app.state.worker_subscriber = worker_subscriber - yield # Run app # Shutdown sequence - this runs when FastAPI is shutting down logger.info("Starting graceful shutdown...") - # First, try graceful shutdown with timeout - if worker_subscriber: - try: - # Check if we should do graceful shutdown (e.g., if SIGTERM was received) - # For now, we'll attempt graceful shutdown for all shutdowns - await worker_subscriber.graceful_shutdown(max_wait_seconds=300) - except Exception as e: - logger.error(f"Error during graceful shutdown: {e}") - # Fall back to immediate stop - await worker_subscriber.stop() - - await redis.aclose() - app = FastAPI( title="Dograh API", diff --git a/api/enums.py b/api/enums.py index 053a06e..fca1dd1 100644 --- a/api/enums.py +++ b/api/enums.py @@ -22,12 +22,12 @@ class WorkflowRunMode(Enum): VONAGE = "vonage" VOBIZ = "vobiz" CLOUDONIX = "cloudonix" - STASIS = "stasis" WEBRTC = "webrtc" SMALLWEBRTC = "smallwebrtc" # Historical, not used anymore. Don't # use and don't remove + STASIS = "stasis" VOICE = "VOICE" CHAT = "CHAT" diff --git a/api/routes/stasis_rtp.py b/api/routes/stasis_rtp.py deleted file mode 100644 index 0fede33..0000000 --- a/api/routes/stasis_rtp.py +++ /dev/null @@ -1,45 +0,0 @@ -import random - -from loguru import logger - -from api.db import db_client -from api.enums import WorkflowRunMode -from api.services.pipecat.run_pipeline import run_pipeline_ari_stasis -from api.services.telephony.stasis_rtp_connection import StasisRTPConnection -from pipecat.utils.run_context import set_current_run_id - - -async def on_stasis_call(call: StasisRTPConnection, call_context_vars: dict): - workflow_id = call_context_vars.get("workflow_id") or call_context_vars.get( - "campaign_id" - ) - user_id = call_context_vars.get("user_id") - - assert workflow_id is not None - assert user_id is not None - - try: - workflow_id = int(workflow_id) - user_id = int(user_id) - except ValueError: - logger.error(f"Invalid workflow ID or user ID: {workflow_id} or {user_id}") - return - - workflow_run_name = f"WR-ARI-{random.randint(1000, 9999)}" - workflow_run = await db_client.create_workflow_run( - workflow_run_name, workflow_id, WorkflowRunMode.STASIS.value, user_id - ) - - set_current_run_id(workflow_run.id) - - # Store the workflow_run_id in the connection for later use - call.workflow_run_id = workflow_run.id - - # Connect channelID with Workflow run ID in logs - logger.info( - f"channelID: {call.caller_channel_id} run_id: {workflow_run.id} " - f"Received call for workflow ID {workflow_id}, user ID {user_id}" - ) - await run_pipeline_ari_stasis( - call, workflow_id, workflow_run.id, user_id, call_context_vars - ) diff --git a/api/services/pipecat/audio_config.py b/api/services/pipecat/audio_config.py index 6bb0e8c..f2ff2d5 100644 --- a/api/services/pipecat/audio_config.py +++ b/api/services/pipecat/audio_config.py @@ -87,18 +87,17 @@ def create_audio_config(transport_type: str) -> AudioConfig: """Create audio configuration based on transport type. Args: - transport_type: Type of transport ("webrtc", "twilio", "vonage", "vobiz", "cloudonix", "stasis") + transport_type: Type of transport ("webrtc", "twilio", "vonage", "vobiz", "cloudonix") Returns: AudioConfig instance with appropriate settings """ if transport_type in ( - WorkflowRunMode.STASIS.value, WorkflowRunMode.TWILIO.value, WorkflowRunMode.VOBIZ.value, WorkflowRunMode.CLOUDONIX.value, ): - # Twilio, Cloudonix, Vobiz, and Stasis use MULAW at 8kHz + # Twilio, Cloudonix, and Vobiz use MULAW at 8kHz return AudioConfig( transport_in_sample_rate=8000, transport_out_sample_rate=8000, diff --git a/api/services/pipecat/run_pipeline.py b/api/services/pipecat/run_pipeline.py index 191e6cc..6b7cb4c 100644 --- a/api/services/pipecat/run_pipeline.py +++ b/api/services/pipecat/run_pipeline.py @@ -33,14 +33,12 @@ from api.services.pipecat.service_factory import ( from api.services.pipecat.tracing_config import setup_pipeline_tracing from api.services.pipecat.transport_setup import ( create_cloudonix_transport, - create_stasis_transport, create_twilio_transport, create_vobiz_transport, create_vonage_transport, create_webrtc_transport, ) from api.services.pipecat.ws_sender_registry import get_ws_sender -from api.services.telephony.stasis_rtp_connection import StasisRTPConnection from api.services.workflow.dto import ReactFlowDTO from api.services.workflow.pipecat_engine import PipecatEngine from api.services.workflow.workflow import WorkflowGraph @@ -364,52 +362,6 @@ async def run_pipeline_smallwebrtc( ) -async def run_pipeline_ari_stasis( - stasis_connection: StasisRTPConnection, - workflow_id: int, - workflow_run_id: int, - user_id: int, - call_context_vars: dict, -) -> None: - """Run pipeline for ARI connections""" - logger.debug( - f"Running pipeline for ARI connection with workflow_id: {workflow_id} and workflow_run_id: {workflow_run_id}" - ) - set_current_run_id(workflow_run_id) - - # Get workflow to extract all pipeline configurations - workflow = await db_client.get_workflow(workflow_id, user_id) - vad_config = None - ambient_noise_config = None - if workflow and workflow.workflow_configurations: - if "vad_configuration" in workflow.workflow_configurations: - vad_config = workflow.workflow_configurations["vad_configuration"] - if "ambient_noise_configuration" in workflow.workflow_configurations: - ambient_noise_config = workflow.workflow_configurations[ - "ambient_noise_configuration" - ] - - # Create audio configuration for Stasis - audio_config = create_audio_config(WorkflowRunMode.STASIS.value) - - transport = create_stasis_transport( - stasis_connection, - workflow_run_id, - audio_config, - vad_config, - ambient_noise_config, - ) - await _run_pipeline( - transport, - workflow_id, - workflow_run_id, - user_id, - call_context_vars=call_context_vars, - audio_config=audio_config, - stasis_connection=stasis_connection, # Pass connection for immediate transfers - ) - - async def _run_pipeline( transport, workflow_id: int, @@ -417,7 +369,6 @@ async def _run_pipeline( user_id: int, call_context_vars: dict = {}, audio_config: AudioConfig = None, - stasis_connection: Optional[StasisRTPConnection] = None, ) -> None: """ Run the pipeline with the given transport and configuration @@ -559,10 +510,6 @@ async def _run_pipeline( engine.set_context(context) engine.set_audio_config(audio_config) - # Set Stasis connection for immediate transfers (if available) - if stasis_connection: - engine.set_stasis_connection(stasis_connection) - assistant_params = LLMAssistantAggregatorParams( expect_stripped_words=True, correct_aggregation_callback=engine.create_aggregation_correction_callback(), diff --git a/api/services/pipecat/service_factory.py b/api/services/pipecat/service_factory.py index a346252..ee76263 100644 --- a/api/services/pipecat/service_factory.py +++ b/api/services/pipecat/service_factory.py @@ -156,7 +156,7 @@ def create_tts_service(user_config, audio_config: "AudioConfig"): Args: user_config: User configuration containing TTS settings - transport_type: Type of transport (e.g., 'stasis', 'twilio', 'webrtc') + transport_type: Type of transport (e.g., 'twilio', 'webrtc') """ logger.info( f"Creating TTS service: provider={user_config.tts.provider}, model={user_config.tts.model}" diff --git a/api/services/pipecat/transport_setup.py b/api/services/pipecat/transport_setup.py index 6cee7fb..2c9ec0a 100644 --- a/api/services/pipecat/transport_setup.py +++ b/api/services/pipecat/transport_setup.py @@ -6,12 +6,6 @@ from api.constants import APP_ROOT_DIR from api.db import db_client from api.enums import OrganizationConfigurationKey from api.services.pipecat.audio_config import AudioConfig -from api.services.telephony.stasis_rtp_connection import StasisRTPConnection -from api.services.telephony.stasis_rtp_serializer import StasisRTPFrameSerializer -from api.services.telephony.stasis_rtp_transport import ( - StasisRTPTransport, - StasisRTPTransportParams, -) from pipecat.audio.mixers.silence_mixer import SilenceAudioMixer from pipecat.audio.mixers.soundfile_mixer import SoundfileMixer from pipecat.serializers.twilio import TwilioFrameSerializer @@ -344,47 +338,6 @@ def create_webrtc_transport( ) -def create_stasis_transport( - stasis_connection: StasisRTPConnection, - workflow_run_id: int, - audio_config: AudioConfig, - vad_config: dict | None = None, - ambient_noise_config: dict | None = None, -): - """Create a transport for ARI connections""" - - serializer = StasisRTPFrameSerializer( - StasisRTPFrameSerializer.InputParams( - sample_rate=audio_config.transport_in_sample_rate - ) - ) - - return StasisRTPTransport( - stasis_connection, - params=StasisRTPTransportParams( - audio_in_enabled=True, - audio_out_enabled=True, - audio_out_sample_rate=audio_config.transport_out_sample_rate, - audio_in_sample_rate=audio_config.transport_in_sample_rate, - # audio_out_10ms_chunks=2, # ToDo: Check if we cant support 40 ms packets? - audio_out_mixer=( - SoundfileMixer( - sound_files={ - "office": APP_ROOT_DIR - / "assets" - / f"office-ambience-{audio_config.transport_out_sample_rate}-mono.wav" - }, - default_sound="office", - volume=ambient_noise_config.get("volume", 0.3), - ) - if ambient_noise_config and ambient_noise_config.get("enabled", False) - else SilenceAudioMixer() - ), - serializer=serializer, - ), - ) - - def create_internal_transport( workflow_run_id: int, audio_config: AudioConfig, diff --git a/api/services/telephony/ari_client.py b/api/services/telephony/ari_client.py deleted file mode 100644 index a2a2a39..0000000 --- a/api/services/telephony/ari_client.py +++ /dev/null @@ -1,765 +0,0 @@ -""" -Dynamic ARI client that generates methods from Swagger/OpenAPI specification. -Pure asyncio implementation without anyio dependencies. -""" - -import asyncio -import json -from dataclasses import dataclass, field -from typing import Callable, Dict, List, Optional -from urllib.parse import urljoin - -import aiohttp -from loguru import logger - - -class SwaggerMethod: - """Represents a Swagger API method.""" - - def __init__( - self, client: "AsyncARIClient", path: str, method: str, operation: dict - ): - self.client = client - self.path = path - self.http_method = method.upper() - self.operation = operation - self.operation_id = operation.get("operationId", "") - self.parameters = operation.get("parameters", []) - self.description = operation.get("description", "") - - def _build_path(self, **kwargs) -> str: - """Build the actual path by substituting path parameters.""" - path = self.path - - # Replace path parameters like {channelId} with actual values - for param in self.parameters: - # Swagger spec uses 'paramType' not 'in' - if param.get("paramType", param.get("in")) == "path": - param_name = param["name"] - if param_name in kwargs: - path = path.replace(f"{{{param_name}}}", str(kwargs[param_name])) - - return path - - def _build_params(self, **kwargs) -> dict: - """Extract query parameters from kwargs.""" - params = {} - - for param in self.parameters: - # Swagger spec uses 'paramType' not 'in' - if param.get("paramType", param.get("in")) == "query": - param_name = param["name"] - if param_name in kwargs: - params[param_name] = kwargs[param_name] - - return params - - def _build_body(self, **kwargs) -> dict: - """Extract body parameters from kwargs.""" - body = {} - - for param in self.parameters: - # Swagger 1.2 uses 'paramType' = 'body' for body parameters - if param.get("paramType", param.get("in")) == "body": - param_name = param["name"] - if param_name in kwargs: - # In Swagger 1.2, body param is usually the whole body - return ( - kwargs[param_name] - if isinstance(kwargs[param_name], dict) - else {param_name: kwargs[param_name]} - ) - - return body - - async def __call__(self, **kwargs): - """Execute the API method.""" - path = self._build_path(**kwargs) - params = self._build_params(**kwargs) - - # Check if there's a body parameter defined in the spec - body_data = self._build_body(**kwargs) - - # If no body param in spec, use remaining kwargs for body (backward compat) - if not body_data: - # Remove path and query parameters from kwargs (leaving body params) - # Swagger spec uses 'paramType' not 'in' - path_param_names = { - p["name"] - for p in self.parameters - if p.get("paramType", p.get("in")) == "path" - } - query_param_names = { - p["name"] - for p in self.parameters - if p.get("paramType", p.get("in")) == "query" - } - body_param_names = { - p["name"] - for p in self.parameters - if p.get("paramType", p.get("in")) == "body" - } - body_data = { - k: v - for k, v in kwargs.items() - if k not in path_param_names - and k not in query_param_names - and k not in body_param_names - } - - # Debug logging for externalMedia - if "externalMedia" in path: - logger.debug( - f"externalMedia call - method: {self.http_method}, path: {path}, params: {params}" - ) - - if self.http_method == "GET": - return await self.client.api_get(path, **params) - elif self.http_method == "POST": - return await self.client.api_post( - path, json_data=body_data if body_data else None, **params - ) - elif self.http_method == "PUT": - return await self.client.api_put( - path, json_data=body_data if body_data else None, **params - ) - elif self.http_method == "DELETE": - return await self.client.api_delete(path, **params) - else: - raise ValueError(f"Unsupported HTTP method: {self.http_method}") - - -class ResourceAPI: - """Represents a resource API (like channels, bridges, etc.).""" - - def __init__(self, client: "AsyncARIClient", resource_name: str): - self.client = client - self.resource_name = resource_name - self._methods = {} - - def add_method(self, method_name: str, swagger_method: SwaggerMethod): - """Add a method to this resource.""" - self._methods[method_name] = swagger_method - - def __getattr__(self, name): - """Dynamically return methods.""" - if name in self._methods: - return self._methods[name] - raise AttributeError(f"'{self.resource_name}' has no method '{name}'") - - -@dataclass -class Channel: - """Channel model with dynamic method support.""" - - id: str - name: str = "" - state: str = "" - caller: Dict[str, str] = field(default_factory=dict) - connected: Dict[str, str] = field(default_factory=dict) - accountcode: str = "" - dialplan: Dict[str, str] = field(default_factory=dict) - creationtime: str = "" - language: str = "en" - - # Store reference to client for method calls - _client: Optional["AsyncARIClient"] = field(default=None, repr=False) - - @classmethod - def from_dict(cls, data: dict, client=None) -> "Channel": - """Create Channel from API response.""" - channel = cls( - id=data.get("id", ""), - name=data.get("name", ""), - state=data.get("state", ""), - caller=data.get("caller", {}), - connected=data.get("connected", {}), - accountcode=data.get("accountcode", ""), - dialplan=data.get("dialplan", {}), - creationtime=data.get("creationtime", ""), - language=data.get("language", "en"), - _client=client, - ) - return channel - - async def continueInDialplan( - self, - context: str = None, - extension: str = None, - priority: int = None, - label: str = None, - ): - """Continue channel in dialplan.""" - if not self._client: - raise RuntimeError("Channel not associated with a client") - - params = {"channelId": self.id} - if context: - params["context"] = context - if extension: - params["extension"] = extension - if priority is not None: - params["priority"] = priority - if label: - params["label"] = label - - # The ARI API method is named 'continueInDialplan' - channels_api = self._client.channels - if hasattr(channels_api, "continueInDialplan"): - await channels_api.continueInDialplan(**params) - else: - # Fallback to direct API call - await self._client.api_post(f"/channels/{self.id}/continue", **params) - - async def hangup(self, reason: str = "normal"): - """Hangup the channel.""" - if not self._client: - raise RuntimeError("Channel not associated with a client") - await self._client.channels.hangup(channelId=self.id, reason=reason) - - async def answer(self): - """Answer the channel.""" - if not self._client: - raise RuntimeError("Channel not associated with a client") - await self._client.channels.answer(channelId=self.id) - - async def getChannelVar(self, variable: str): - """Get a channel variable.""" - if not self._client: - raise RuntimeError("Channel not associated with a client") - return await self._client.channels.getChannelVar( - channelId=self.id, variable=variable - ) - - -@dataclass -class Bridge: - """Bridge model with dynamic method support.""" - - id: str - technology: str = "" - bridge_type: str = "" - bridge_class: str = "" - creator: str = "" - name: str = "" - channels: List[str] = field(default_factory=list) - - _client: Optional["AsyncARIClient"] = field(default=None, repr=False) - - @classmethod - def from_dict(cls, data: dict, client=None) -> "Bridge": - """Create Bridge from API response.""" - return cls( - id=data.get("id", ""), - technology=data.get("technology", ""), - bridge_type=data.get("bridge_type", ""), - bridge_class=data.get("bridge_class", ""), - creator=data.get("creator", ""), - name=data.get("name", ""), - channels=data.get("channels", []), - _client=client, - ) - - async def addChannel(self, channel: str): - """Add channel to bridge.""" - if not self._client: - raise RuntimeError("Bridge not associated with a client") - await self._client.bridges.addChannel(bridgeId=self.id, channel=channel) - - async def removeChannel(self, channel: str): - """Remove channel from bridge.""" - if not self._client: - raise RuntimeError("Bridge not associated with a client") - await self._client.bridges.removeChannel(bridgeId=self.id, channel=channel) - - async def destroy(self): - """Destroy the bridge.""" - if not self._client: - raise RuntimeError("Bridge not associated with a client") - await self._client.bridges.destroy(bridgeId=self.id) - - -class AsyncARIClient: - """ARI client that dynamically generates methods from Swagger spec.""" - - def __init__(self, base_url: str, username: str, password: str, app: str): - self.base_url = base_url.rstrip("/") - self.username = username - self.password = password - self.app = app - - # REST API URL - self.api_url = self.base_url.replace("ws://", "http://").replace( - "wss://", "https://" - ) - - # WebSocket URL - self.ws_url = ( - f"{self.base_url}/ari/events?app={app}&api_key={username}:{password}" - ) - - # Session and WebSocket - self._session: Optional[aiohttp.ClientSession] = None - self._websocket: Optional[aiohttp.ClientWebSocketResponse] = None - self._running = False - - # Event handling - self._event_handlers: Dict[str, List[Callable]] = {} - self._event_queue: asyncio.Queue = asyncio.Queue(maxsize=1000) - - # Resource APIs (will be populated from Swagger) - self.channels: Optional[ResourceAPI] = None - self.bridges: Optional[ResourceAPI] = None - self.endpoints: Optional[ResourceAPI] = None - self.recordings: Optional[ResourceAPI] = None - self.sounds: Optional[ResourceAPI] = None - self.playbacks: Optional[ResourceAPI] = None - self.asterisk: Optional[ResourceAPI] = None - self.applications: Optional[ResourceAPI] = None - self.deviceStates: Optional[ResourceAPI] = None - self.mailboxes: Optional[ResourceAPI] = None - - # Swagger spec cache - self._swagger_spec: Optional[dict] = None - - async def connect(self): - """Connect to ARI and load Swagger spec.""" - # Create HTTP session - auth = aiohttp.BasicAuth(self.username, self.password) - self._session = aiohttp.ClientSession(auth=auth) - - try: - # Load Swagger spec and generate methods - await self._load_swagger_spec() - - # Connect WebSocket - self._websocket = await self._session.ws_connect( - self.ws_url, heartbeat=30, autoping=True - ) - self._running = True - logger.info(f"Connected to ARI at {self.ws_url}") - - except Exception as e: - await self._session.close() - raise Exception(f"Failed to connect to ARI: {e}") - - async def _load_swagger_spec(self): - """Load Swagger spec and generate dynamic methods.""" - spec_loaded = False - try: - # Get Swagger spec from ARI - url = f"{self.api_url}/ari/api-docs/resources.json" - async with self._session.get(url) as resp: - resp.raise_for_status() - resources = await resp.json() - - # Store the spec - self._swagger_spec = resources - - # Create resource APIs - for api_info in resources.get("apis", []): - resource_path = api_info["path"] - - # Fix the path - remove .{format} and add proper prefix - resource_path = resource_path.replace(".{format}", ".json") - - # Load detailed spec for this resource - # The resource_path already contains /api-docs/, so we just need the base URL - url = f"{self.api_url}/ari{resource_path}" - try: - async with self._session.get(url) as resp: - resp.raise_for_status() - spec = await resp.json() - - self._process_swagger_spec(spec) - spec_loaded = True - except Exception as e: - logger.warning(f"Failed to load spec for {resource_path}: {e}") - - if spec_loaded: - logger.info("Loaded Swagger spec and generated dynamic methods") - else: - raise Exception("No individual specs could be loaded") - - except Exception as e: - logger.warning(f"Failed to load Swagger spec, using fallback methods: {e}") - self._create_fallback_methods() - - def _process_swagger_spec(self, spec: dict): - """Process a Swagger spec and create dynamic methods.""" - # basePath is available in spec but not currently used - - for api in spec.get("apis", []): - path = api["path"] - - for operation in api.get("operations", []): - self._create_method_from_operation(path, operation) - - def _create_method_from_operation(self, path: str, operation: dict): - """Create a method from a Swagger operation.""" - # Swagger spec uses 'httpMethod' not 'method' - method = operation.get("httpMethod", operation.get("method", "GET")) - operation_id = operation.get("nickname", "") - - if not operation_id: - return - - # Determine resource from path (e.g., /channels/{channelId} -> channels) - path_parts = path.strip("/").split("/") - if path_parts: - resource_name = path_parts[0] - - # Create resource API if it doesn't exist - if not hasattr(self, resource_name) or getattr(self, resource_name) is None: - setattr(self, resource_name, ResourceAPI(self, resource_name)) - - resource_api = getattr(self, resource_name) - - # Extract method name from operation ID - # e.g., "channels_continue" -> "continue_" - # or "channels_get" -> "get" - method_name = operation_id - if method_name.startswith(resource_name + "_"): - method_name = method_name[len(resource_name) + 1 :] - - # Handle special cases - if method_name == "continue": - method_name = "continue_" # Avoid Python keyword - - # Create and add the method - swagger_method = SwaggerMethod(self, path, method, operation) - resource_api.add_method(method_name, swagger_method) - - def _create_fallback_methods(self): - """Create fallback methods if Swagger spec is not available.""" - # Create basic resource APIs - self.channels = ResourceAPI(self, "channels") - self.bridges = ResourceAPI(self, "bridges") - - # Add essential channel methods - self.channels.add_method( - "get", - SwaggerMethod( - self, - "/channels/{channelId}", - "GET", - { - "operationId": "get", - "parameters": [{"name": "channelId", "in": "path"}], - }, - ), - ) - self.channels.add_method( - "hangup", - SwaggerMethod( - self, - "/channels/{channelId}", - "DELETE", - { - "operationId": "hangup", - "parameters": [ - {"name": "channelId", "in": "path"}, - {"name": "reason", "in": "query"}, - ], - }, - ), - ) - self.channels.add_method( - "answer", - SwaggerMethod( - self, - "/channels/{channelId}/answer", - "POST", - { - "operationId": "answer", - "parameters": [{"name": "channelId", "in": "path"}], - }, - ), - ) - self.channels.add_method( - "continueInDialplan", - SwaggerMethod( - self, - "/channels/{channelId}/continue", - "POST", - { - "operationId": "continueInDialplan", - "parameters": [ - {"name": "channelId", "in": "path"}, - {"name": "context", "in": "query"}, - {"name": "extension", "in": "query"}, - {"name": "priority", "in": "query"}, - {"name": "label", "in": "query"}, - ], - }, - ), - ) - self.channels.add_method( - "externalMedia", - SwaggerMethod( - self, - "/channels/externalMedia", - "POST", - { - "operationId": "externalMedia", - "parameters": [ - {"name": "channelId", "in": "query"}, # Add channelId parameter - {"name": "app", "in": "query"}, - {"name": "external_host", "in": "query"}, - {"name": "format", "in": "query"}, - {"name": "encapsulation", "in": "query"}, - {"name": "transport", "in": "query"}, - {"name": "connection_type", "in": "query"}, - {"name": "direction", "in": "query"}, - ], - }, - ), - ) - self.channels.add_method( - "getChannelVar", - SwaggerMethod( - self, - "/channels/{channelId}/variable", - "GET", - { - "operationId": "getChannelVar", - "parameters": [ - {"name": "channelId", "in": "path"}, - {"name": "variable", "in": "query"}, - ], - }, - ), - ) - - # Add essential bridge methods - self.bridges.add_method( - "get", - SwaggerMethod( - self, - "/bridges/{bridgeId}", - "GET", - { - "operationId": "get", - "parameters": [{"name": "bridgeId", "in": "path"}], - }, - ), - ) - self.bridges.add_method( - "create", - SwaggerMethod( - self, - "/bridges", - "POST", - { - "operationId": "create", - "parameters": [ - {"name": "type", "in": "query"}, - {"name": "name", "in": "query"}, - ], - }, - ), - ) - self.bridges.add_method( - "addChannel", - SwaggerMethod( - self, - "/bridges/{bridgeId}/addChannel", - "POST", - { - "operationId": "addChannel", - "parameters": [ - {"name": "bridgeId", "in": "path"}, - {"name": "channel", "in": "query"}, - ], - }, - ), - ) - self.bridges.add_method( - "removeChannel", - SwaggerMethod( - self, - "/bridges/{bridgeId}/removeChannel", - "POST", - { - "operationId": "removeChannel", - "parameters": [ - {"name": "bridgeId", "in": "path"}, - {"name": "channel", "in": "query"}, - ], - }, - ), - ) - self.bridges.add_method( - "destroy", - SwaggerMethod( - self, - "/bridges/{bridgeId}", - "DELETE", - { - "operationId": "destroy", - "parameters": [{"name": "bridgeId", "in": "path"}], - }, - ), - ) - - async def disconnect(self): - """Disconnect from ARI.""" - self._running = False - - if self._websocket: - await self._websocket.close() - - if self._session: - await self._session.close() - - async def run(self): - """Main event loop.""" - if not self._websocket: - raise RuntimeError("Not connected") - - processor_task = asyncio.create_task(self._process_events()) - - try: - async for msg in self._websocket: - if msg.type == aiohttp.WSMsgType.TEXT: - try: - event = json.loads(msg.data) - # Wrap channel/bridge objects - if "channel" in event and isinstance(event["channel"], dict): - event["channel"] = Channel.from_dict(event["channel"], self) - if "bridge" in event and isinstance(event["bridge"], dict): - event["bridge"] = Bridge.from_dict(event["bridge"], self) - await self._event_queue.put(event) - except json.JSONDecodeError: - logger.error(f"Invalid JSON: {msg.data}") - - elif msg.type == aiohttp.WSMsgType.ERROR: - logger.error(f"WebSocket error: {self._websocket.exception()}") - break - - elif msg.type == aiohttp.WSMsgType.CLOSED: - logger.info("WebSocket closed") - break - - finally: - self._running = False - processor_task.cancel() - await asyncio.gather(processor_task, return_exceptions=True) - - async def _process_events(self): - """Process events from queue.""" - while self._running: - try: - event = await asyncio.wait_for(self._event_queue.get(), timeout=1.0) - event_type = event.get("type") - if event_type: - await self._dispatch_event(event_type, event) - except asyncio.TimeoutError: - continue - except asyncio.CancelledError: - break - except Exception as e: - logger.error(f"Error processing event: {e}") - - async def _dispatch_event(self, event_type: str, event: dict): - """Dispatch event to handlers.""" - handlers = self._event_handlers.get(event_type, []) - if handlers: - logger.debug( - f"AsyncARIClient: Dispatching {event_type} to {len(handlers)} handlers" - ) - for i, handler in enumerate(handlers): - try: - logger.debug( - f" AsyncARIClient: Calling {event_type} handler {i + 1}/{len(handlers)}" - ) - await handler(event) - except Exception as e: - logger.error(f"Handler {i + 1} error for {event_type}: {e}") - - def on_event(self, event_type: str, handler: Callable): - """Register event handler.""" - if event_type not in self._event_handlers: - self._event_handlers[event_type] = [] - logger.debug( - f"AsyncARIClient: Registering handler for {event_type}. Current count: {len(self._event_handlers.get(event_type, []))}" - ) - self._event_handlers[event_type].append(handler) - logger.debug( - f"AsyncARIClient: After registration, {event_type} handler count: {len(self._event_handlers[event_type])}" - ) - - # REST API methods - async def api_get(self, path: str, **params) -> dict: - """GET request.""" - # Ensure path starts with /ari if not already - if not path.startswith("/ari"): - path = f"/ari{path}" if path.startswith("/") else f"/ari/{path}" - url = urljoin(self.api_url, path.lstrip("/")) - async with self._session.get(url, params=params) as resp: - resp.raise_for_status() - data = await resp.json() - # Wrap known objects - if isinstance(data, list): - # Handle lists of channels/bridges - if "/channels" in path: - return [ - Channel.from_dict(item, self) - if isinstance(item, dict) - else item - for item in data - ] - elif "/bridges" in path: - return [ - Bridge.from_dict(item, self) if isinstance(item, dict) else item - for item in data - ] - return data - elif isinstance(data, dict): - if "/channels/" in path and "id" in data: - return Channel.from_dict(data, self) - elif "/bridges/" in path and "id" in data: - return Bridge.from_dict(data, self) - return data - - async def api_post(self, path: str, json_data: dict = None, **params) -> dict: - """POST request.""" - # Ensure path starts with /ari if not already - if not path.startswith("/ari"): - path = f"/ari{path}" if path.startswith("/") else f"/ari/{path}" - url = urljoin(self.api_url, path.lstrip("/")) - async with self._session.post(url, json=json_data, params=params) as resp: - resp.raise_for_status() - if resp.content_length and resp.content_length > 0: - data = await resp.json() - # Wrap known objects - if "id" in data and "state" in data: - return Channel.from_dict(data, self) - elif "id" in data and "bridge_type" in data: - return Bridge.from_dict(data, self) - return data - return {} - - async def api_put(self, path: str, json_data: dict = None, **params) -> dict: - """PUT request.""" - # Ensure path starts with /ari if not already - if not path.startswith("/ari"): - path = f"/ari{path}" if path.startswith("/") else f"/ari/{path}" - url = urljoin(self.api_url, path.lstrip("/")) - async with self._session.put(url, json=json_data, params=params) as resp: - resp.raise_for_status() - if resp.content_length and resp.content_length > 0: - return await resp.json() - return {} - - async def api_delete(self, path: str, **params) -> dict: - """DELETE request.""" - # Ensure path starts with /ari if not already - if not path.startswith("/ari"): - path = f"/ari{path}" if path.startswith("/") else f"/ari/{path}" - url = urljoin(self.api_url, path.lstrip("/")) - async with self._session.delete(url, params=params) as resp: - resp.raise_for_status() - if resp.content_length and resp.content_length > 0: - return await resp.json() - return {} diff --git a/api/services/telephony/ari_client_manager.py b/api/services/telephony/ari_client_manager.py deleted file mode 100644 index c47fcd4..0000000 --- a/api/services/telephony/ari_client_manager.py +++ /dev/null @@ -1,437 +0,0 @@ -""" -ARI Client Manager using the new Async ARI Client. -Drop-in replacement for the existing ari_client_manager.py. -""" - -import asyncio -import json -import os -import random -import time -from typing import Awaitable, Callable, Optional - -import httpx -from loguru import logger - -from api.services.telephony.ari_client import AsyncARIClient, Channel -from api.services.telephony.ari_client_singleton import ari_client_singleton - - -class ARIClientManager: - """Manages ARI client connection and event handling. - - This is a compatibility wrapper around AsyncARIClient. - """ - - def __init__( - self, - ari_client: AsyncARIClient, - app_endpoint: str, - _conn_ctx=None, # Not used with AsyncARIClient - ): - """Initialize the ARI client manager. - - Parameters - ---------- - ari_client: AsyncARIClient - The connected ARI client. - app_endpoint: str - The app endpoint for external media. - _conn_ctx: - Not used, kept for compatibility. - """ - self._ari_client = ari_client - self._app_endpoint = app_endpoint - self._conn_ctx = _conn_ctx # Not used but kept for compatibility - self._start_handlers = [] - self._end_handlers = [] - self._running = False - self._handlers_registered = False # Track if handlers are registered - - def register_start_handler( - self, handler: Callable[[Channel, dict], Awaitable[None]] - ): - """Register a handler for StasisStart events.""" - logger.debug( - f"Registering start handler. Current count: {len(self._start_handlers)}" - ) - self._start_handlers.append(handler) - logger.debug(f"After registration, handler count: {len(self._start_handlers)}") - - def register_end_handler(self, handler: Callable[[str], Awaitable[None]]): - """Register a handler for StasisEnd events.""" - self._end_handlers.append(handler) - - async def update_client(self, new_client: AsyncARIClient, new_conn_ctx=None): - """Update to a new client (for reconnection).""" - logger.info("Updating ARI client for reconnection") - self._ari_client = new_client - self._conn_ctx = new_conn_ctx - # Clear old event handlers from the client before re-registering - # to prevent duplicate handler registrations - if hasattr(new_client, "_event_handlers"): - new_client._event_handlers.clear() - # Re-register event handlers - self._register_handlers() - - def _register_handlers(self): - """Register event handlers with the client.""" - logger.debug( - f"_register_handlers called. Start handlers count: {len(self._start_handlers)}, End handlers count: {len(self._end_handlers)}" - ) - - async def on_stasis_start(event): - """Handle StasisStart events.""" - channel = event.get("channel") - - # Only handle PJSIP and SIP channels - if channel and hasattr(channel, "name"): - if not ( - channel.name.startswith("PJSIP") or channel.name.startswith("SIP") - ): - logger.debug( - f"Ignoring StasisStart for non-SIP channel: {channel.name}" - ) - return - - # Log the event - logger.info( - f"StasisStart event for channel: {channel.id if channel else 'unknown'}" - ) - - # Extract call context variables - call_context_vars = {} - try: - # Get channel variables - var_result = await channel.getChannelVar( - variable="LOCAL_ARI_CALL_VARIABLES" - ) - call_context_vars = json.loads(var_result.get("value", "{}")) - - # Try to get phone number and fetch additional data - phone_number = call_context_vars.get("phone") - ari_data_uri = os.getenv("ARI_DATA_FETCHING_URI") - - if phone_number and ari_data_uri: - try: - start_time = time.time() - fetch_url = f"{ari_data_uri}{phone_number}" - - async with httpx.AsyncClient() as client: - response = await client.get(fetch_url, timeout=10.0) - response.raise_for_status() - - # Parse the response - get the latest line if multiple lines - response_text = response.text.strip() - if response_text: - lines = response_text.split("\n") - latest_line = lines[-1].strip() - - if latest_line: - # Parse the pipe-delimited data - fields = latest_line.split("|") - field_names = [ - "status", - "user", - "vendor_lead_code", - "source_id", - "list_id", - "gmt_offset_now", - "phone_code", - "phone_number", - "title", - "first_name", - "middle_initial", - "last_name", - "address1", - "address2", - "address3", - "city", - "state", - "province", - "postal_code", - "country_code", - "gender", - "date_of_birth", - "alt_phone", - "email", - "security_phrase", - "comments", - "called_count", - "last_local_call_time", - "rank", - "owner", - "entry_list_id", - "lead_id", - ] - - # Map fields to call_context_vars - for i, field_name in enumerate(field_names): - try: - call_context_vars[field_name] = fields[i] - except IndexError: - logger.error( - f"channelID: {channel.id} IndexError while accessing fields {i}" - ) - - elapsed_time = time.time() - start_time - logger.info( - f"channelID: {channel.id} Successfully fetched user details for phone: {phone_number} in {elapsed_time:.3f} seconds" - ) - - except Exception as e: - elapsed_time = time.time() - start_time - logger.error( - f"channelID: {channel.id} Failed to fetch user details from ARI_DATA_FETCHING_URI after {elapsed_time:.3f} seconds: {e}" - ) - - logger.debug( - f"channelID: {channel.id} call context variables: {call_context_vars}" - ) - - except ( - KeyError, - AttributeError, - httpx.HTTPStatusError, - json.JSONDecodeError, - ) as e: - logger.debug(f"could not find variable LOCAL_ARI_CALL_VARIABLES: {e}") - - # Call all registered handlers with call_context_vars - logger.debug( - f"Calling {len(self._start_handlers)} start handlers for channel {channel.id}" - ) - for i, handler in enumerate(self._start_handlers): - try: - logger.debug( - f" Calling start handler {i + 1}/{len(self._start_handlers)}" - ) - await handler(channel, call_context_vars) - except Exception as e: - logger.error(f"Error in StasisStart handler {i + 1}: {e}") - - async def on_stasis_end(event): - """Handle StasisEnd events.""" - channel = event.get("channel", {}) - channel_id = channel.id if hasattr(channel, "id") else channel.get("id", "") - - # # Only handle PJSIP and SIP channels - # if channel: - # channel_name = channel.name if hasattr(channel, 'name') else channel.get("name", "") - # if channel_name and not (channel_name.startswith("PJSIP") or channel_name.startswith("SIP")): - # logger.debug(f"Ignoring StasisEnd for non-SIP channel: {channel_name}") - # return - - logger.info(f"StasisEnd event for channel: {channel_id}") - - # Call all registered handlers - for handler in self._end_handlers: - try: - await handler(channel_id) - except Exception as e: - logger.error(f"Error in StasisEnd handler: {e}") - - # Register with the AsyncARIClient - logger.debug(f"Registering StasisStart and StasisEnd with AsyncARIClient") - self._ari_client.on_event("StasisStart", on_stasis_start) - self._ari_client.on_event("StasisEnd", on_stasis_end) - logger.debug(f"Event handlers registered with client") - - async def run(self): - """Run the event loop. - - The actual WebSocket handling is done by AsyncARIClient. - This just registers handlers and waits. - """ - logger.debug("Running ARIClientManager") - self._running = True - # Register handlers only once, on first run - if not self._handlers_registered: - self._register_handlers() - self._handlers_registered = True - - try: - # The AsyncARIClient.run() method handles WebSocket - # We don't call it here as it's called by the supervisor - while self._running: - await asyncio.sleep(1) - except asyncio.CancelledError: - logger.debug(f"ARIClientManager run cancelled") - self._running = False - raise - finally: - self._running = False - - -class _ARIClientManagerSupervisor: - """Supervisor that maintains ARI connection with automatic reconnection. - - This replaces the asyncari-based supervisor with AsyncARIClient. - """ - - # Reconnection parameters - _INITIAL_BACKOFF = 1 # Start with 1 second - _MAX_BACKOFF = 60 # Max 60 seconds between retries - - def __init__( - self, - on_channel_start: Callable[[Channel, dict], Awaitable[None]], - on_channel_end: Optional[Callable[[str], Awaitable[None]]] = None, - ): - self._on_channel_start = on_channel_start - self._on_channel_end = on_channel_end - self._shutting_down = False - - async def start(self): - """Start the supervisor and maintain connection.""" - await self._runner() - - async def stop(self): - """Stop the supervisor.""" - logger.info("Stopping ARI Client Manager Supervisor") - self._shutting_down = True - - async def __aenter__(self): - """Async context manager entry.""" - asyncio.create_task(self.start()) - return self - - async def __aexit__(self, *args): - """Async context manager exit.""" - await self.stop() - - async def _runner(self): - """Main reconnection loop using AsyncARIClient.""" - backoff = self._INITIAL_BACKOFF - ari_client_manager: Optional[ARIClientManager] = None - - while not self._shutting_down: - client = None - - try: - logger.debug("Going to connect with ARI") - - # Get configuration from environment - base_url = os.getenv("ARI_STASIS_ENDPOINT") - username = os.getenv("ARI_STASIS_USER") - password = os.getenv("ARI_STASIS_USER_PASSWORD") - app = os.getenv("ARI_STASIS_APP_NAME") - - # Convert HTTP to WebSocket URL - ws_url = base_url.replace("http://", "ws://").replace( - "https://", "wss://" - ) - - # Create and connect the AsyncARIClient - client = AsyncARIClient(ws_url, username, password, app) - await client.connect() - - # Update the singleton with the new client - ari_client_singleton.set_client(client) - - if ari_client_manager is None: - # First connection - create new manager - logger.debug("Creating new ARIClientManager (first connection)") - ari_client_manager = ARIClientManager( - client, - os.getenv("ARI_STASIS_APP_ENDPOINT"), - _conn_ctx=None, # Not needed with AsyncARIClient - ) - logger.debug(f"Registering handlers with new manager") - ari_client_manager.register_start_handler(self._on_channel_start) - if self._on_channel_end: - ari_client_manager.register_end_handler(self._on_channel_end) - else: - # Reconnection - update existing manager - logger.debug("Updating existing ARIClientManager (reconnection)") - # Don't re-register start and end handlers as they're already registered - await ari_client_manager.update_client(client, None) - - logger.info("Connected to ARI — supervisor entering event loop") - - # Reset backoff after successful connection - backoff = self._INITIAL_BACKOFF - - # Create tasks for both the client and manager - client_task = asyncio.create_task(client.run()) - manager_task = asyncio.create_task(ari_client_manager.run()) - - # Wait for either to complete (likely due to disconnection) - done, pending = await asyncio.wait( - {client_task, manager_task}, return_when=asyncio.FIRST_COMPLETED - ) - - # Cancel the other task - for task in pending: - task.cancel() - try: - await task - except asyncio.CancelledError: - pass - - except asyncio.CancelledError: - # Check if we're shutting down - if self._shutting_down or asyncio.current_task().cancelled(): - logger.debug("ARI supervisor task cancelled — shutting down") - break - - # Otherwise it's a transient connection error - logger.warning("ARI connection lost due to CancelledError — will retry") - - # Force a context switch to reset event loop state - await asyncio.sleep(0) - - except Exception as exc: - # Check if we're shutting down - if self._shutting_down or asyncio.current_task().cancelled(): - logger.warning("Exiting due to shutdown during exception handling") - break - - # Log and retry - logger.warning(f"ARI connection failed or lost: {exc!r} - will retry") - - finally: - # Disconnect client if connected - if client: - try: - await client.disconnect() - except Exception as e: - logger.warning(f"Error disconnecting client: {e}") - # Clear the singleton when disconnecting - ari_client_singleton.clear() - - # Check if we're shutting down before sleeping - if self._shutting_down: - logger.debug("Exiting reconnection loop due to shutdown") - break - - # Exponential back-off with jitter before the next attempt - jitter = random.uniform(0.1, backoff) - logger.debug(f"Waiting {jitter:.1f} seconds before reconnecting...") - - # Sleep with proper event loop handling - await asyncio.sleep(0) # Yield control first - await asyncio.sleep(jitter) - - logger.debug(f"Finished sleeping for {jitter} seconds") - backoff = min(backoff * 2, self._MAX_BACKOFF) - logger.debug(f"New backoff value: {backoff}, continuing loop...") - - -async def setup_ari_client_supervisor( - on_channel_start: Callable[[Channel, dict], Awaitable[None]], - on_channel_end: Callable[[str], Awaitable[None]] | None = None, -) -> "_ARIClientManagerSupervisor | None": - """Start a background supervisor that keeps the ARI connection alive. - - This is a drop-in replacement for the asyncari-based function. - Uses AsyncARIClient instead of asyncari. - """ - logger.info("Starting ARI Client Supervisor with AsyncARIClient") - - supervisor = _ARIClientManagerSupervisor(on_channel_start, on_channel_end) - - # Start the supervisor in the background - asyncio.create_task(supervisor.start()) - - return supervisor diff --git a/api/services/telephony/ari_client_singleton.py b/api/services/telephony/ari_client_singleton.py deleted file mode 100644 index 8ebc5fe..0000000 --- a/api/services/telephony/ari_client_singleton.py +++ /dev/null @@ -1,50 +0,0 @@ -"""Singleton holder for the current ARI client instance. - -This module provides a thread-safe singleton that holds the current -ARI client instance, which can be updated during reconnections. -""" - -from typing import Optional - -from loguru import logger - -from api.services.telephony.ari_client import AsyncARIClient - - -class ARIClientSingleton: - """Singleton holder for the current ARI client instance.""" - - _instance: Optional["ARIClientSingleton"] = None - _client: Optional[AsyncARIClient] = None - - def __new__(cls): - """Ensure only one instance exists.""" - if cls._instance is None: - cls._instance = super().__new__(cls) - return cls._instance - - def set_client(self, client: AsyncARIClient) -> None: - """Update the ARI client instance. - - Args: - client: The new ARI client instance. - """ - self._client = client - logger.info("ARI client singleton updated with new client instance") - - def get_client(self) -> Optional[AsyncARIClient]: - """Get the current ARI client instance. - - Returns: - The current ARI client, or None if not set. - """ - return self._client - - def clear(self) -> None: - """Clear the current client instance.""" - self._client = None - logger.info("ARI client singleton cleared") - - -# Global singleton instance -ari_client_singleton = ARIClientSingleton() diff --git a/api/services/telephony/ari_manager.py b/api/services/telephony/ari_manager.py deleted file mode 100644 index 78e4ed3..0000000 --- a/api/services/telephony/ari_manager.py +++ /dev/null @@ -1,749 +0,0 @@ -"""Standalone ARI Manager Service for distributed architecture. - -This service maintains the single WebSocket connection to Asterisk ARI -and distributes events to multiple FastAPI workers via Redis pub/sub. - -ARIManager creates an instance of ARIClientSupervisor and registers the callbacks -on_channel_start and on_channel_end. It is responsible to take in caller_channel -and setup ARIManagerConnection, i.e create bridge for externalMedia. - -""" - -import asyncio -import json -import os -import signal -import time -from typing import Dict, Optional - -from api.constants import ENABLE_ARI_STASIS, REDIS_URL - -# --- Add logging setup before importing loguru --- -from api.logging_config import setup_logging -from api.services.telephony.stasis_event_protocol import ( - BaseWorkerToARIManagerCommand, - DisconnectCommand, - RedisChannels, - RedisKeys, - SocketClosedCommand, - StasisEndEvent, - StasisStartEvent, - TransferCommand, - parse_command, -) - -setup_logging() - -import redis.asyncio as aioredis -import redis.exceptions -from loguru import logger - -from api.services.telephony.ari_client import Channel -from api.services.telephony.ari_client_manager import ( - ARIClientManager, - setup_ari_client_supervisor, -) -from api.services.telephony.ari_manager_connection import ARIManagerConnection -from pipecat.utils.enums import EndTaskReason - - -class ARIManager: - """Manages ARI connection and distributes events to workers via Redis.""" - - def __init__(self, redis_client: aioredis.Redis): - self.redis = redis_client - self.stasis_manager: Optional[ARIClientManager] = None - self._running = False - self._ari_client_supervisor = None - self._tasks: Dict[str, asyncio.Task] = {} - self._pubsubs: Dict[ - str, aioredis.client.PubSub - ] = {} # Track pubsub connections - self._active_channels: set[str] = ( - set() - ) # Track channels managed by this instance - self._port_range = range(4000, 5000, 2) # Even ports only - self._channel_connections: Dict[ - str, ARIManagerConnection - ] = {} # Track connections by channel ID - self._channel_disposed: Dict[str, bool] = {} # Track channel disposed state - self._socket_closed: Dict[str, bool] = {} # Track socket closed state - self._active_workers: list[str] = [] # Cached list of active workers - self._worker_discovery_task: Optional[asyncio.Task] = None - self._channel_to_worker: Dict[str, str] = {} # Map channel to worker - - async def on_channel_start(self, caller_channel: Channel, call_context_vars: dict): - """Handle new channel from ARIClientManager with atomically allocated port.""" - try: - # Atomically allocate port for this channel (prevents race conditions) - port = await self._get_and_allocate_port_atomic(caller_channel.id) - - # Create connection with allocated port - connection = ARIManagerConnection( - caller_channel=caller_channel, - host=os.getenv("ARI_STASIS_APP_ENDPOINT"), - port=port, - ) - - # Track the connection - self._channel_connections[caller_channel.id] = connection - # Initialize channel state flags - self._channel_disposed[caller_channel.id] = False - self._socket_closed[caller_channel.id] = False - - # Handle the connection - await self._on_stasis_call(connection, call_context_vars) - - except Exception as e: - logger.exception(f"Error handling new channel {caller_channel.id}: {e}") - # Release port if allocation failed - await self._release_port_for_channel(caller_channel.id) - - async def on_channel_end(self, channel_id: str): - """Handle channel end notification from ARIClientManager.""" - logger.info(f"channelID: {channel_id} Received channel end notification") - - # Find the connection for this channel - connection = None - caller_channel_id = None - - # Check if it's a caller channel - if channel_id in self._channel_connections: - connection = self._channel_connections[channel_id] - caller_channel_id = channel_id - else: - # TODO: We are currently not handling StasisEnd on ExternalMedia - for conn_channel_id, conn in self._channel_connections.items(): - if conn.em_channel_id and conn.em_channel_id == channel_id: - logger.debug( - f"channelID: {channel_id} ExternalMedia StasisEnd - Ignoring" - ) - # connection = conn - # caller_channel_id = conn_channel_id - break - - # Publish StasisEnd event to worker immediately - if connection and caller_channel_id: - worker_id = self._get_worker_for_channel(caller_channel_id) - event = StasisEndEvent( - channel_id=caller_channel_id, - reason=EndTaskReason.USER_HANGUP.value, - ) - await self.redis.publish( - RedisChannels.worker_events(worker_id), event.to_json() - ) - logger.info(f"channelID: {channel_id} Published StasisEnd event") - - # Notify the connection about channel end - await connection.notify_channel_end() - - # Mark channel as disposed - if caller_channel_id in self._channel_disposed: - self._channel_disposed[caller_channel_id] = True - # Check if both flags are set to cleanup - await self._check_and_cleanup_channel(caller_channel_id) - - async def _on_stasis_call( - self, connection: ARIManagerConnection, call_context_vars: dict - ): - """Handle new Stasis call by setting up the connection and publishing to Redis.""" - try: - # Setup the connection (create bridge and external media) - await connection.setup_call() - - if not connection.is_connected(): - logger.warning("Connection is not connected, skipping") - return - - # Extract all necessary information after bridge is created - channel_id = connection.caller_channel_id - em_channel_id = connection.em_channel_id - bridge_id = connection.bridge_id - - # Track this channel as active - self._active_channels.add(channel_id) - - # Create event with all connection details - event = StasisStartEvent( - channel_id=channel_id, - caller_channel_id=channel_id, - em_channel_id=em_channel_id, - bridge_id=bridge_id, - local_addr=list(connection.local_addr), - remote_addr=list(connection.remote_addr) - if connection.remote_addr - else None, - call_context_vars=call_context_vars, - ) - - # Select worker using round-robin - worker_id = await self._select_worker() - if worker_id is None: - logger.error(f"channelID: {channel_id} No active workers available") - await connection.disconnect() - return - - # Track channel to worker mapping - self._channel_to_worker[channel_id] = worker_id - channel = RedisChannels.worker_events(worker_id) - - # Publish event to specific worker - await self.redis.publish(channel, event.to_json()) - logger.info( - f"channelID: {channel_id} Published stasis_start event to worker {worker_id}" - ) - - # Start monitoring for commands from workers - self._tasks[channel_id] = asyncio.create_task( - self._monitor_channel_commands(channel_id, connection) - ) - - except Exception as e: - logger.exception(f"Error handling stasis call: {e}") - - async def _get_and_allocate_port_atomic(self, channel_id: str) -> int: - """Atomically find and allocate an available port using Redis Lua script. - - This method prevents race conditions by using a Lua script that executes - atomically in Redis, ensuring that two concurrent calls cannot allocate - the same port. - """ - # Lua script for atomic port allocation - lua_script = """ - local port_range_start = tonumber(ARGV[1]) - local port_range_end = tonumber(ARGV[2]) - local port_range_step = tonumber(ARGV[3]) - local channel_id = KEYS[1] - local timestamp = ARGV[4] - - -- Check if channel already has a port allocated - local existing_port = redis.call('HGET', 'channel_ports', channel_id) - if existing_port then - return tonumber(existing_port) - end - - -- Find first available port - for port = port_range_start, port_range_end, port_range_step do - local port_str = tostring(port) - local exists = redis.call('HEXISTS', 'port_channels', port_str) - if exists == 0 then - -- Atomically allocate the port - redis.call('HSET', 'channel_ports', channel_id, port) - redis.call('HSET', 'port_channels', port_str, channel_id) - redis.call('HSET', 'channel_allocation_time', channel_id, timestamp) - return port - end - end - - return -1 -- No ports available - """ - - # Execute the Lua script with port range parameters - port_start = min(self._port_range) - port_end = max(self._port_range) - port_step = self._port_range.step - timestamp = int(time.time()) - - port = await self.redis.eval( - lua_script, - 1, # Number of keys - channel_id, # KEYS[1] - port_start, # ARGV[1] - port_end, # ARGV[2] - port_step, # ARGV[3] - timestamp, # ARGV[4] - ) - - if port == -1: - # If all ports exhausted, clean up orphaned ports and retry - await self._cleanup_orphaned_ports() - - # Retry after cleanup - port = await self.redis.eval( - lua_script, 1, channel_id, port_start, port_end, port_step, timestamp - ) - - if port == -1: - raise RuntimeError( - "No available ports in configured range after cleanup" - ) - - logger.debug(f"Atomically allocated port {port} for channel {channel_id}") - return port - - async def _release_port_for_channel(self, channel_id: str): - """Atomically release port when channel ends. - - Uses a Lua script to ensure all cleanup operations happen atomically, - preventing partial cleanup or race conditions during release. - """ - lua_script = """ - local channel_id = KEYS[1] - - -- Get the port allocated to this channel - local port = redis.call('HGET', 'channel_ports', channel_id) - - if port then - -- Atomically clean up all related entries - redis.call('HDEL', 'channel_ports', channel_id) - redis.call('HDEL', 'port_channels', port) - redis.call('HDEL', 'channel_allocation_time', channel_id) - return port - end - - return nil - """ - - port = await self.redis.eval(lua_script, 1, channel_id) - - if port: - logger.debug(f"Atomically released port {port} for channel {channel_id}") - else: - logger.debug(f"No port was allocated for channel {channel_id}") - - async def _discover_workers(self): - """Periodically discover active workers from Redis.""" - try: - while self._running: - try: - # Get all worker IDs from the set - worker_ids = await self.redis.smembers(RedisKeys.workers_set()) - - # Filter to only active workers - active_workers = [] - for worker_id in worker_ids: - worker_id = ( - worker_id.decode() - if isinstance(worker_id, bytes) - else worker_id - ) - worker_key = RedisKeys.worker_active(worker_id) - worker_data = await self.redis.get(worker_key) - - if worker_data: - try: - data = json.loads(worker_data) - # Only include workers that are ready (not draining) - if data.get("status") == "ready": - active_workers.append(worker_id) - except json.JSONDecodeError: - logger.warning(f"Invalid worker data for {worker_id}") - - # Update the cached list atomically - self._active_workers = active_workers - logger.info(f"Discovered {len(active_workers)} active workers") - - except Exception as e: - logger.error(f"Error discovering workers: {e}") - - # Check every 5 seconds - await asyncio.sleep(5) - - except asyncio.CancelledError: - logger.debug("Worker discovery task cancelled") - - async def _select_worker(self) -> Optional[str]: - """Select a worker using round-robin.""" - if not self._active_workers: - return None - - # Use Redis to maintain round-robin index across restarts - try: - index = await self.redis.incr(RedisKeys.round_robin_index()) - worker_index = (index - 1) % len(self._active_workers) - return self._active_workers[worker_index] - except Exception as e: - logger.error(f"Error selecting worker: {e}") - # Fallback to first worker if Redis operation fails - return self._active_workers[0] if self._active_workers else None - - def _get_worker_for_channel(self, channel_id: str) -> str: - """Get the assigned worker for a channel (for sending commands).""" - # Return the worker ID that was assigned to this channel - return self._channel_to_worker.get(channel_id, "") - - async def _monitor_channel_commands( - self, channel_id: str, connection: ARIManagerConnection - ): - """Listen for commands from workers for this channel.""" - # TODO: Not sure if its a good idea to monitor command for every channel - # using pubsub. What happens if there are more number of calls than number - # of tcp connections redis can support? We can do something similar to - # Campaign Orchestrator, where we can subscribe to one channel and have - # commands for every channel there. - command_channel = RedisChannels.channel_commands(channel_id) - pubsub = None - - try: - pubsub = self.redis.pubsub() - await pubsub.subscribe(command_channel) - - # Store the pubsub connection for cleanup - self._pubsubs[channel_id] = pubsub - - logger.debug(f"channelID: {channel_id} Monitoring commands for channel") - - async for message in pubsub.listen(): - if message["type"] == "message": - try: - command = parse_command(message["data"]) - if command: - await self._handle_worker_command( - channel_id, command, connection - ) - else: - logger.warning( - f"Failed to parse command for {channel_id}: {message['data']}" - ) - except Exception as e: - logger.exception( - f"Error handling command for {channel_id}: {e}" - ) - - except asyncio.CancelledError: - logger.debug(f"channelID: {channel_id} Command monitor cancelled") - raise # Re-raise to maintain proper cancellation semantics - except (ConnectionError, redis.exceptions.ConnectionError) as e: - # We close the pubsub before cancelling the task. So, the code - # flow will arrive here - pass - except Exception as e: - logger.exception(f"Error in command monitor for {channel_id}: {e}") - - async def _handle_worker_command( - self, - channel_id: str, - command: BaseWorkerToARIManagerCommand, - connection: ARIManagerConnection, - ): - """Execute commands from workers.""" - if isinstance(command, DisconnectCommand): - logger.info(f"channelID: {channel_id} Worker requested disconnect") - await connection.disconnect() - - elif isinstance(command, TransferCommand): - logger.info(f"channelID: {channel_id} Worker requested transfer") - await connection.transfer(command.context) - - elif isinstance(command, SocketClosedCommand): - logger.info(f"channelID: {channel_id} Worker notified socket closed") - - # Mark socket as closed - if channel_id in self._socket_closed: - self._socket_closed[channel_id] = True - - # Release port immediately - await self._release_port_for_channel(channel_id) - - # Check if both flags are set to cleanup - await self._check_and_cleanup_channel(channel_id) - else: - logger.warning( - f"channelID: {channel_id} Received unknown command: {command}" - ) - - async def _check_and_cleanup_channel(self, channel_id: str): - """Check if both flags are set and cleanup channel if so.""" - channel_disposed = self._channel_disposed.get(channel_id, False) - socket_closed = self._socket_closed.get(channel_id, False) - - logger.debug( - f"channelID: {channel_id} Check cleanup - disposed: {channel_disposed}, socket_closed: {socket_closed}" - ) - - if channel_disposed and socket_closed: - # Remove from active channels and connections - self._active_channels.discard(channel_id) - self._channel_connections.pop(channel_id, None) - - # Close pubsub connection first (before cancelling task) - if channel_id in self._pubsubs: - pubsub = self._pubsubs[channel_id] - try: - command_channel = RedisChannels.channel_commands(channel_id) - await pubsub.unsubscribe(command_channel) - await pubsub.aclose() - logger.debug( - f"channelID: {channel_id} Closed pubsub connection in cleanup" - ) - except Exception as e: - logger.warning(f"Error closing pubsub for {channel_id}: {e}") - finally: - del self._pubsubs[channel_id] - - # Cancel command monitor task - if channel_id in self._tasks: - task = self._tasks[channel_id] - if not task.done(): - # Task is still running, cancel it - task.cancel() - try: - # Wait for task to complete - await task - logger.debug( - f"channelID: {channel_id} Task completed after cancel" - ) - except asyncio.CancelledError: - logger.debug( - f"channelID: {channel_id} Task cancelled successfully" - ) - except Exception as e: - logger.warning( - f"channelID: {channel_id} Task raised exception: {e}" - ) - else: - # Task already completed - logger.debug( - f"channelID: {channel_id} Monitor task already completed" - ) - try: - # Still await to get any exception that might have occurred - await task - except Exception as e: - logger.warning( - f"channelID: {channel_id} Completed task had exception: {e}" - ) - - del self._tasks[channel_id] - - # Clean up the flag tracking - self._channel_disposed.pop(channel_id, None) - self._socket_closed.pop(channel_id, None) - - logger.info(f"channelID: {channel_id} Completed cleanup of all resources") - - async def _cleanup_orphaned_ports(self): - """Clean up ports from previous ungraceful shutdowns.""" - try: - # Get all channel-port mappings - channel_ports = await self.redis.hgetall("channel_ports") - if not channel_ports: - return - - logger.info( - f"Found {len(channel_ports)} existing port allocations, checking for orphans..." - ) - - cleaned = 0 - current_time = int(time.time()) - max_age_seconds = 3600 # 1 hour - - # On startup, we can safely assume any existing allocations are orphaned - # since this is a fresh instance with no active channels yet - if not self._active_channels: - # Clean up all existing allocations on startup - for channel_id, port in channel_ports.items(): - allocation_time = await self.redis.hget( - "channel_allocation_time", channel_id - ) - age_str = "" - if allocation_time: - age = current_time - int(allocation_time) - age_str = f" (aged {age}s)" - - await self._release_port_for_channel(channel_id) - logger.info( - f"Cleaned up orphaned port {port} for channel {channel_id}{age_str}" - ) - cleaned += 1 - else: - # During runtime, only clean up channels not being tracked - for channel_id, port in channel_ports.items(): - if channel_id not in self._active_channels: - # Check allocation age - allocation_time = await self.redis.hget( - "channel_allocation_time", channel_id - ) - if allocation_time: - age = current_time - int(allocation_time) - if age > max_age_seconds: - # Too old, clean up regardless - await self._release_port_for_channel(channel_id) - logger.info( - f"Cleaned up stale port {port} for channel {channel_id} (aged {age}s)" - ) - cleaned += 1 - continue - - # Not tracked by this instance, might be orphaned - # For safety, only clean up if reasonably old (5 minutes) - if ( - allocation_time - and (current_time - int(allocation_time)) > 300 - ): - await self._release_port_for_channel(channel_id) - logger.info( - f"Cleaned up orphaned port {port} for untracked channel {channel_id}" - ) - cleaned += 1 - - if cleaned > 0: - logger.info(f"Cleaned up {cleaned} orphaned port allocations") - - except Exception as e: - logger.exception(f"Error during orphaned port cleanup: {e}") - - async def _periodic_cleanup(self): - """Periodically clean up orphaned ports.""" - cleanup_interval = 1800 # 30 minutes - - while self._running: - try: - await asyncio.sleep(cleanup_interval) - if self._running: # Check again after sleep - logger.info("Running periodic orphaned port cleanup...") - await self._cleanup_orphaned_ports() - except asyncio.CancelledError: - logger.debug("Periodic cleanup task cancelled") - break - except Exception as e: - logger.exception(f"Error in periodic cleanup: {e}") - - async def run(self): - """Main run loop for ARI Manager.""" - if not ENABLE_ARI_STASIS: - logger.info("ARI Stasis integration disabled via environment variable") - return - - # Setup ARI connection with supervisor - self._running = True - - try: - self._ari_client_supervisor = await setup_ari_client_supervisor( - self.on_channel_start, self.on_channel_end - ) - if not self._ari_client_supervisor: - logger.error("Failed to setup ARI connection") - return - - # Start worker discovery task - self._worker_discovery_task = asyncio.create_task(self._discover_workers()) - - # Wait a moment for initial worker discovery - await asyncio.sleep(1) - - logger.info( - f"ARI Manager started with {len(self._active_workers)} active workers" - ) - - # Clean up any orphaned ports from previous runs - await self._cleanup_orphaned_ports() - - # Start periodic cleanup task - cleanup_task = asyncio.create_task(self._periodic_cleanup()) - - # Keep running until shutdown - while self._running: - await asyncio.sleep(1) - - logger.debug("ARIManager._running is false. Will cleanup and shutdown") - - # Cancel cleanup task - cleanup_task.cancel() - try: - await cleanup_task - except asyncio.CancelledError: - pass - - except Exception as e: - logger.exception(f"ARI Manager error: {e}") - finally: - if self._ari_client_supervisor: - await self._ari_client_supervisor.close() - logger.info("ARI Manager stopped") - - async def shutdown(self): - """Graceful shutdown.""" - logger.info("Shutting down ARI Manager...") - - # Close supervisor first to prevent reconnection attempts - if self._ari_client_supervisor: - await self._ari_client_supervisor.close() - - # Cancel worker discovery task - if self._worker_discovery_task: - self._worker_discovery_task.cancel() - try: - await self._worker_discovery_task - except asyncio.CancelledError: - pass - self._worker_discovery_task = None - - # Now set running to False - self._running = False - - # Clean up all active channel ports before shutting down - if self._active_channels: - logger.info(f"Cleaning up {len(self._active_channels)} active channels...") - for channel_id in list( - self._active_channels - ): # Copy to avoid modification during iteration - await self._release_port_for_channel(channel_id) - logger.info( - f"Released port for active channel {channel_id} during shutdown" - ) - self._active_channels.clear() - - # Clear flag tracking - self._channel_disposed.clear() - self._socket_closed.clear() - - # Cancel all monitoring tasks - for task in self._tasks.values(): - task.cancel() - - # Wait for tasks to complete - if self._tasks: - await asyncio.gather(*self._tasks.values(), return_exceptions=True) - - -async def main(): - """Main entry point for ARI Manager service.""" - # Setup Redis connection - redis = await aioredis.from_url(REDIS_URL, decode_responses=True) - - # Create and run manager - manager = ARIManager(redis) - - # Create a shutdown event for clean coordination - shutdown_event = asyncio.Event() - - # Setup signal handlers - loop = asyncio.get_event_loop() - - def signal_handler(signum): - logger.info(f"Received shutdown signal {signum}") - # Set the shutdown event which will trigger shutdown - shutdown_event.set() - - for sig in (signal.SIGTERM, signal.SIGINT): - loop.add_signal_handler(sig, lambda s=sig: signal_handler(s)) - - # Run manager with shutdown monitoring - manager_task = asyncio.create_task(manager.run()) - shutdown_task = asyncio.create_task(shutdown_event.wait()) - - try: - # Wait for either normal completion or shutdown signal - done, pending = await asyncio.wait( - [manager_task, shutdown_task], return_when=asyncio.FIRST_COMPLETED - ) - - # If shutdown was triggered, perform graceful shutdown - if shutdown_task in done: - await manager.shutdown() - # Cancel the manager task if still running - if manager_task in pending: - manager_task.cancel() - try: - await manager_task - except asyncio.CancelledError: - pass - finally: - await redis.aclose() - - -if __name__ == "__main__": - # Configure logging - logger.add("logs/ari_manager.log", rotation="10 MB") - asyncio.run(main()) diff --git a/api/services/telephony/ari_manager_connection.py b/api/services/telephony/ari_manager_connection.py deleted file mode 100644 index ac77b09..0000000 --- a/api/services/telephony/ari_manager_connection.py +++ /dev/null @@ -1,323 +0,0 @@ -"""ARI-specific Stasis connection for use by ARI Manager. - -This connection has direct access to the ARI client and manages -the actual Asterisk channels, bridges, and RTP setup. -""" - -import json -import os -import uuid -from typing import Optional - -import httpx -from loguru import logger - -from api.services.telephony.ari_client import AsyncARIClient, Bridge, Channel -from api.services.telephony.ari_client_singleton import ari_client_singleton -from pipecat.utils.base_object import BaseObject - - -class ARIManagerConnection(BaseObject): - """ARI Manager's connection that directly controls Asterisk resources. - - This class is used only by the ARI Manager process and has full - access to the ARI client for creating bridges, channels, etc. - """ - - def __init__( - self, - caller_channel: Channel, - host: str, - port: int, - ) -> None: - """Initialize ARI Stasis connection. - - Args: - caller_channel: The caller's channel object. - host: Host address for RTP transport. - port: Port number for RTP transport. - """ - super().__init__() - - # External dependencies. - self._host: str = host - self._port: int = port - - # Store channel IDs instead of Channel objects to avoid stale references - self.caller_channel_id: str = caller_channel.id - self.em_channel_id: Optional[str] = None # externalMedia channel ID - - # Store bridge ID to avoid stale references after reconnection - self.bridge_id: Optional[str] = None - - # RTP addressing information - self.local_addr = ("0.0.0.0", port) - self.remote_addr = None - - # Internal state. - self._closed: bool = False - self._is_connected: bool = False - - def is_connected(self) -> bool: - """Check if the connection is established.""" - return self._is_connected and not self._closed - - @property - def _ari(self) -> Optional[AsyncARIClient]: - """Get the current ARI client from singleton.""" - return ari_client_singleton.get_client() - - async def _get_channel(self, channel_id: str) -> Optional[Channel]: - """Safely get a channel object by ID. - - Returns None if the channel doesn't exist or can't be fetched. - """ - if not channel_id: - return None - try: - # Get current client from singleton - client = self._ari - if not client: - logger.warning( - f"Cannot get channel {channel_id} - No ARI client available" - ) - return None - # Check if the session is still active - if not client._session or client._session.closed: - logger.warning( - f"Cannot get channel {channel_id} - ARI session is closed" - ) - return None - return await client.channels.get(channelId=channel_id) - except Exception as e: - logger.warning(f"Could not get channel {channel_id} - {e}") - return None - - async def _get_bridge(self, bridge_id: str) -> Optional[Bridge]: - """Safely get a bridge object by ID. - - Returns None if the bridge doesn't exist or can't be fetched. - """ - if not bridge_id: - return None - try: - # Get current client from singleton - client = self._ari - if not client: - logger.warning( - f"Cannot get bridge {bridge_id} - No ARI client available" - ) - return None - # Check if the session is still active - if not client._session or client._session.closed: - logger.warning(f"Cannot get bridge {bridge_id} - ARI session is closed") - return None - return await client.bridges.get(bridgeId=bridge_id) - except Exception as e: - logger.warning(f"Could not get bridge {bridge_id}: {e}") - return None - - async def _cleanup_resources(self): - """Clean up external media channel and bridge.""" - # Cleanup external media channel - try: - if self.em_channel_id: - em_channel = await self._get_channel(self.em_channel_id) - if em_channel: - await em_channel.hangup() - logger.debug( - f"channelID: {self.em_channel_id} Hung up external media" - ) - self.em_channel_id = None - except Exception as exc: - logger.warning( - f"Failed to hang-up externalMedia channel: {self.em_channel_id}" - f"Error: {exc}" - ) - - # Cleanup bridge - try: - if self.bridge_id: - bridge = await self._get_bridge(self.bridge_id) - if bridge: - await bridge.destroy() - logger.debug(f"bridgeID: {self.bridge_id} Destroyed bridge") - self.bridge_id = None - except Exception as exc: - logger.warning(f"Failed to destroy bridge: {self.bridge_id}Error: {exc}") - - async def _sync_call_data(self, call_transfer_context: dict): - """Sync call data to ARI_DATA_SYNCING_URI.""" - if not os.getenv("ARI_DATA_SYNCING_URI"): - return - - lead_id = call_transfer_context.get("lead_id") - status = call_transfer_context.get("disposition") - - # {'lead_id': '299154', 'disposition': 'VM', 'agent_name': 'Alex', 'decision_maker': 'False', 'employment': 'N/A', 'debts': 'N/A', 'number_of_credit_cards': 'N/A', 'time': '2025-08-07T13:16:02-04:00'} - - full_name = call_transfer_context.get("full_name", "") - phone = call_transfer_context.get("phone", "") - debts = call_transfer_context.get("debts", "") - employment = call_transfer_context.get("employment", "") - time = call_transfer_context.get("time", "") - - comment = f"Type:Qualified!NName:{full_name}!NPhone:{phone}!NDebts:{debts}!NCC:N/A!NDM:Yes!NEmployment:{employment}!NTime:{time}!NVendor Id:!NStatus:{status}" - - try: - if lead_id and status: - ari_data_uri = os.getenv("ARI_DATA_SYNCING_URI") - # Add URL params to the base URL - sync_url = f"{ari_data_uri}&lead_id={lead_id}&status={status}&comments={comment}" - - logger.debug( - f"channelID: {self.caller_channel_id} Syncing data to ARI_DATA_SYNCING_URI: {sync_url}" - ) - - async with httpx.AsyncClient() as client: - response = await client.post(sync_url, timeout=10.0) - response.raise_for_status() - logger.info( - f"channelID: {self.caller_channel_id} Successfully synced data for lead_id: {lead_id} with status: {status}" - ) - else: - logger.warning( - f"channelID: {self.caller_channel_id} Missing lead_id or status for syncing" - ) - except Exception as e: - logger.error( - f"channelID: {self.caller_channel_id} Failed to sync data to ARI_DATA_SYNCING_URI: {e}" - ) - - async def disconnect(self): - """Instruct Asterisk to hang-up the call and perform cleanup.""" - if self._closed: - return - - # Lets mark it as closed so that when we receive StasisEnd, we don't - # try to cleanup resource again - self._closed = True - - # Clean up resources first - await self._cleanup_resources() - - try: - if self.caller_channel_id: - caller_channel = await self._get_channel(self.caller_channel_id) - if caller_channel: - logger.debug( - f"channelID: {self.caller_channel_id} Hanging up caller channel" - ) - await caller_channel.hangup() - except Exception: - logger.exception("Failed to hangup caller channel") - - async def transfer(self, call_transfer_context: dict): - """Transfer the call by continuing in dialplan with extracted variables.""" - if self._closed: - return - - # Lets mark it as closed so that when we receive StasisEnd, we don't - # try to cleanup resource again - self._closed = True - - try: - # Clean up resources before transferring - await self._cleanup_resources() - - if self.caller_channel_id: - caller_channel = await self._get_channel(self.caller_channel_id) - if caller_channel: - logger.debug( - f"channelID: {self.caller_channel_id} User qualified, continuing in dialplan " - f"REMOTE_DISPO_CALL_VARIABLES: {json.dumps(call_transfer_context)}" - ) - - # Sync data to ARI_DATA_SYNCING_URI - await self._sync_call_data( - call_transfer_context=call_transfer_context - ) - - await caller_channel.continueInDialplan() - except Exception: - logger.exception("Failed to transfer caller channel") - - async def setup_call(self): - """Setup the bridge and external media channel. - - This must be called after initialization to establish the connection. - """ - await self._setup_call(self._host, self._port) - - async def _setup_call(self, host: str, port: int): - """Create externalMedia + bridge and notify that the call is connected.""" - try: - em_channel_id = str(uuid.uuid4()) - logger.debug( - f"channelID: {em_channel_id} Creating externalMedia channel on {host}:{port}" - ) - - client = self._ari - if not client: - raise RuntimeError("No ARI client available") - - em_channel = await client.channels.externalMedia( - app=client.app, - channelId=em_channel_id, - external_host=f"{host}:{port}", - format="ulaw", - direction="both", - ) - - # Store the channel ID - self.em_channel_id = em_channel.id - - # Create a mixing bridge and add both legs. - bridge = await client.bridges.create(type="mixing") - self.bridge_id = bridge.id - # Add channels individually as AsyncARIClient expects single channel per call - await bridge.addChannel(channel=self.caller_channel_id) - await bridge.addChannel(channel=self.em_channel_id) - - # TODO: Figure out how can we get the remote public IP. Till then - # just pick it from the environment variable - # Get RTP addressing information - # ip = await em_channel.getChannelVar( - # variable="UNICASTRTP_LOCAL_ADDRESS" - # ) - port = await em_channel.getChannelVar(variable="UNICASTRTP_LOCAL_PORT") - - self.remote_addr = ( - os.environ.get("ASTERISK_REMOTE_IP"), - int(port["value"]), - ) - - logger.debug( - f"channelID: {self.caller_channel_id} ARIManagerConnection connection resources ready " - f"(bridgeID: {self.bridge_id}), (emChannelID: {self.em_channel_id})" - f"remote address: {self.remote_addr}, local address: {self.local_addr}" - ) - - self._is_connected = True - - except Exception as exc: - logger.exception(f"Error setting up ARIManagerConnection: {exc}") - await self._cleanup_resources() - - async def notify_channel_end(self): - """Notify that a channel has ended. Received after we get StasisEnd on the caller channel""" - if self._closed: - return - - self._closed = True - self._is_connected = False - - # Cleanup resources using the shared method - await self._cleanup_resources() - - def __repr__(self): - """Return string representation of connection.""" - return ( - f"" - ) diff --git a/api/services/telephony/stasis_event_protocol.py b/api/services/telephony/stasis_event_protocol.py deleted file mode 100644 index 81b87e8..0000000 --- a/api/services/telephony/stasis_event_protocol.py +++ /dev/null @@ -1,184 +0,0 @@ -"""Redis communication protocol for distributed ARI architecture. - -Defines message formats and helpers for ARI Manager <-> Worker communication. -""" - -import json -from dataclasses import asdict, dataclass -from enum import Enum -from typing import Any, Dict, List, Optional - - -class EventType(str, Enum): - """Types of events sent from ARI Manager to Workers.""" - - STASIS_START = "stasis_start" - STASIS_END = "stasis_end" - CHANNEL_UPDATE = "channel_update" - ERROR = "error" - - -class CommandType(str, Enum): - """Types of commands sent from Workers to ARI Manager.""" - - DISCONNECT = "disconnect" - TRANSFER = "transfer" - UPDATE_STATE = "update_state" - SOCKET_CLOSED = "socket_closed" - - -@dataclass -class BaseWorkerToARIManagerCommand: - """Base class for all commands sent from Workers to ARI Manager.""" - - type: str - channel_id: str = "" - - def to_json(self) -> str: - return json.dumps(asdict(self)) - - @classmethod - def from_json(cls, data: str): - return cls(**json.loads(data)) - - -@dataclass -class StasisStartEvent: - """Event sent when a new call is bridged and ready.""" - - type: str = EventType.STASIS_START - channel_id: str = "" - caller_channel_id: str = "" - em_channel_id: Optional[str] = None - bridge_id: Optional[str] = None - local_addr: List[Any] = None # [host, port] - remote_addr: Optional[List[Any]] = None # [host, port] with UNICASTRTP_LOCAL_PORT - call_context_vars: Dict[str, Any] = None - - def __post_init__(self): - if self.local_addr is None: - self.local_addr = [] - if self.call_context_vars is None: - self.call_context_vars = {} - - def to_json(self) -> str: - return json.dumps(asdict(self)) - - @classmethod - def from_json(cls, data: str) -> "StasisStartEvent": - return cls(**json.loads(data)) - - -@dataclass -class StasisEndEvent: - """Event sent when a call ends.""" - - type: str = EventType.STASIS_END - channel_id: str = "" - reason: Optional[str] = None - - def to_json(self) -> str: - return json.dumps(asdict(self)) - - @classmethod - def from_json(cls, data: str) -> "StasisEndEvent": - return cls(**json.loads(data)) - - -@dataclass -class DisconnectCommand(BaseWorkerToARIManagerCommand): - """Command to disconnect a call.""" - - type: str = CommandType.DISCONNECT - reason: str = "worker_requested" - - -@dataclass -class TransferCommand(BaseWorkerToARIManagerCommand): - """Command to transfer a call.""" - - type: str = CommandType.TRANSFER - context: Dict[str, Any] = None - - def __post_init__(self): - if self.context is None: - self.context = {} - - -@dataclass -class SocketClosedCommand(BaseWorkerToARIManagerCommand): - """Command to notify that RTP sockets have been closed.""" - - type: str = CommandType.SOCKET_CLOSED - - -class RedisChannels: - """Redis channel naming conventions.""" - - @staticmethod - def worker_events(worker_id: str) -> str: - """Channel for events sent to a specific worker.""" - return f"ari:events:worker:{worker_id}" - - @staticmethod - def channel_commands(channel_id: str) -> str: - """Channel for commands related to a specific call channel.""" - return f"ari:commands:{channel_id}" - - @staticmethod - def channel_updates(channel_id: str) -> str: - """Channel for state updates about a specific call.""" - return f"ari:updates:{channel_id}" - - -class RedisKeys: - """Redis key naming conventions for worker registration and discovery.""" - - @staticmethod - def worker_active(worker_id: str) -> str: - """Key for active worker status and metadata.""" - return f"workers:active:{worker_id}" - - @staticmethod - def workers_set() -> str: - """Set containing all registered worker IDs.""" - return "workers:set" - - @staticmethod - def round_robin_index() -> str: - """Counter for round-robin worker selection.""" - return "workers:round_robin:index" - - -def parse_event(data: str) -> Any: - """Parse a Redis event message.""" - try: - parsed = json.loads(data) - event_type = parsed.get("type") - - if event_type == EventType.STASIS_START: - return StasisStartEvent(**parsed) - elif event_type == EventType.STASIS_END: - return StasisEndEvent(**parsed) - else: - return parsed - except Exception: - return None - - -def parse_command(data: str) -> Any: - """Parse a Redis command message.""" - try: - parsed = json.loads(data) - cmd_type = parsed.get("type") - - if cmd_type == CommandType.DISCONNECT: - return DisconnectCommand(**parsed) - elif cmd_type == CommandType.TRANSFER: - return TransferCommand(**parsed) - elif cmd_type == CommandType.SOCKET_CLOSED: - return SocketClosedCommand(**parsed) - else: - return parsed - except Exception: - return None diff --git a/api/services/telephony/stasis_rtp_client.py b/api/services/telephony/stasis_rtp_client.py deleted file mode 100644 index 1281b7a..0000000 --- a/api/services/telephony/stasis_rtp_client.py +++ /dev/null @@ -1,315 +0,0 @@ -"""Low-level RTP transport for Asterisk externalMedia sessions. - -stasis_rtp_client.py -~~~~~~~~~~~~~~~~~~~~ - -* Sends and receives **proper RTP/UDP** (PT 0 PCMU/μ-law). -* Uses 20 ms frames (160 bytes payload) by default; automatically - chunks or concatenates data so timestamps stay correct. -* Verifies the RTP header on the receive path (SSRC and PT). -""" - -import asyncio -import secrets -import socket -import struct -from typing import TYPE_CHECKING, AsyncIterator, Optional - -from loguru import logger - -if TYPE_CHECKING: - from api.services.telephony.stasis_rtp_connection import StasisRTPConnection - from api.services.telephony.stasis_rtp_transport import StasisRTPCallbacks - -# ─────────────────────────────────────────────────────────────────── helpers - - -_RTP_HDR = struct.Struct("!BBHII") # v/p/x/cc, m/pt, seq, ts, ssrc -_PT_PCMU = 0 # static payload type for μ-law - - -class _RTPEncoder: - """Builds PCMU RTP headers for the packets we SEND to Asterisk.""" - - def __init__(self): - self.ssrc = secrets.randbits(32) - self.seq = secrets.randbits(16) - self.ts = 0 # incremented by #payload bytes - - def pack(self, payload: bytes, mark=False) -> bytes: - b0 = 0x80 # V=2 - b1 = (0x80 if mark else 0x00) | _PT_PCMU - hdr = _RTP_HDR.pack(b0, b1, self.seq, self.ts, self.ssrc) - self.seq = (self.seq + 1) & 0xFFFF - self.ts += len(payload) # 1 sample/byte @ 8 kHz - return hdr + payload - - -class _RTPDecoder: - """Very forgiving RTP decoder. - - Latches on the first valid packet and then insists - that SSRC & PT match afterwards. Returns *None* if the packet - should be ignored. - """ - - def __init__(self): - self.peer_ssrc: int | None = None # learned from first packet - - def unpack(self, packet: bytes) -> bytes | None: - if len(packet) < _RTP_HDR.size: - return None - b0, b1, seq, ts, ssrc = _RTP_HDR.unpack_from(packet) - if (b0 & 0xC0) != 0x80: # RTP v2? - return None - if (b1 & 0x7F) != _PT_PCMU: # payload-type 0? - return None - if self.peer_ssrc is None: - self.peer_ssrc = ssrc # latch on first good packet - elif ssrc != self.peer_ssrc: - return None # stray stream – drop - return packet[_RTP_HDR.size :] - - -# ──────────────────────────────────────────────────────────────── client - - -class StasisRTPClient: - """Low-level wrapper around StasisRTPConnection. - - Public API - ────────── - • await setup(start_frame) kept for parity (does nothing) - • await connect() - • async for payload in receive(): # μ-law bytes (20 ms each) - … - • await send(data) # any length; will be chunked - • await disconnect() - """ - - _FRAME_SIZE = 160 # 20 ms @ 8 kHz PCMU - - def __init__( - self, - connection: "StasisRTPConnection", - callbacks: "StasisRTPCallbacks", - ): - """Initialize Stasis RTP client. - - Args: - connection: RTP connection parameters. - callbacks: Callback handlers for transport events. - """ - from typing import Any - - self._connection = connection - self._callbacks = callbacks - self._encoder = _RTPEncoder() - self._decoder = _RTPDecoder() - - self._recv_sock: Optional[socket.socket] = None - self._send_sock: Optional[socket.socket] = None - self._closing = False - self._recv_sock_ready = asyncio.Event() # Signal when recv socket is ready - self._leave_counter = 0 # Track input/output transport usage - - # ── wire event handlers to the connection ──────────────── - @self._connection.event_handler("connected") - async def _on_connected(_: Any): - await self._setup_sockets() - await self._callbacks.on_client_connected( - self._connection.caller_channel_id - ) - - @self._connection.event_handler("disconnected") - async def _on_disconnected(_: Any): - logger.debug("In _on_disconnected of StasisRTPClient") - await self._callbacks.on_client_disconnected( - self._connection.caller_channel_id - ) - - # ─── public helpers ────────────────────────────────────────── - - async def setup(self, _): - """Setup method for compatibility.""" - self._leave_counter += 1 - - async def connect(self): - """Connect to the RTP socket.""" - if self._connection.is_connected(): - return - await self._connection.connect() - - async def disconnect(self): - """Disconnect from the RTP socket.""" - # Decrement leave counter when disconnect is called - logger.debug(f"StasisRTPClient.disconnect leave_counter: {self._leave_counter}") - self._leave_counter -= 1 - if self._leave_counter > 0: - # Early return - InputTransport called first, OutputTransport will call later - # Only proceed when counter reaches 0 (OutputTransport's call) - return - - # Close sockets - logger.debug("Going to close sockets") - await self._close_sockets() - - if self._closing: - # We might have received the disconnected callback from the StasisRTPConnection - # due to user hangup. We will just return. We have already closed the sockets - # in disconnected callback handler. - return - self._closing = True - - # If we have initiated transfer before, we would ignore _connection.disconnect() - # in the connection. (since is_closing would be set by transfer) - try: - await self._connection.disconnect() - except Exception as exc: - logger.error(f"Failed to disconnect RTP connection: {exc}") - - # ─── socket management ────────────────────────────────────── - - async def _setup_sockets(self): - if self._recv_sock and self._send_sock: - return - - logger.debug( - f"Setting up Sockets - local {self._connection.local_addr}, remote: {self._connection.remote_addr}" - ) - - # receive socket – bind to local address provided by connection - if not self._recv_sock: - rs = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - rs.setblocking(False) - rs.bind(self._connection.local_addr) - self._recv_sock = rs - self._recv_sock_ready.set() # Signal that recv socket is ready - - # send socket – connect to remote (Asterisk) address - if not self._send_sock: - ss = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - ss.setblocking(False) - ss.connect(self._connection.remote_addr) - self._send_sock = ss - - logger.debug( - f"Socket setup complete - recv_fd: {self._recv_sock.fileno()}, send_fd: {self._send_sock.fileno()}" - ) - - async def _close_sockets(self): - """Safely close sockets with proper error handling.""" - for sock_name, sock in [("recv", self._recv_sock), ("send", self._send_sock)]: - if sock: - try: - # Shutdown the socket first to break any pending operations - sock.shutdown(socket.SHUT_RDWR) - except OSError: - # Socket might already be closed or in a bad state - pass - try: - sock.close() - except Exception as exc: - logger.debug(f"Error closing {sock_name} socket: {exc}") - - self._recv_sock = None - self._send_sock = None - self._recv_sock_ready.clear() # Reset the event for potential reconnection - - # Notify the connection that sockets are closed so ARI Manager can clean up ports - await self._connection.notify_sockets_closed() - - logger.debug("Closed sockets in StasisRTPClient") - - # ─── receive path ──────────────────────────────────────────── - - async def receive(self) -> AsyncIterator[bytes]: - """Async generator yielding μ-law frames (exactly 160 bytes each). - - Silently drops any packet whose RTP header does not match our SSRC/PT. - """ - loop = asyncio.get_running_loop() - - # Wait for recv socket to be created - try: - await self._recv_sock_ready.wait() - except asyncio.CancelledError: - return - - logger.debug("Going to receive from the socket now") - - while not self._closing: - try: - # each loop gets 172 bytes UDP packet, which is 160 bytes of - # audio data (Asterisk sends 20ms audio chunks with 8k sample rate) - # and 12 bytes of RTP header - data = await loop.sock_recv(self._recv_sock, 2048) - except asyncio.CancelledError: - logger.debug("RTP receive task cancelled") - break - except (OSError, socket.error) as exc: - logger.warning(f"RTP receive failed (socket closed): {exc}") - break - except Exception as exc: - logger.debug(f"Unexpected error in receive: {exc}") - break - - payload = self._decoder.unpack(data) - if payload is None: - continue # header failed validation - - # In practice Asterisk sends 20 ms frames – assert just in case. - if len(payload) != self._FRAME_SIZE: - logger.warning(f"Dropping non-20 ms packet len={len(payload)}") - continue - yield payload - - # ─── send path ─────────────────────────────────────────────── - - async def send(self, data: bytes): - """Send μ-law data of arbitrary length. - - Splits/aggregates into 160-byte chunks before RTP-wrapping. - """ - if self._closing or not self._send_sock: - return - loop = asyncio.get_running_loop() - - # chunk/concat to 160-byte frames - chunks = self._chunk_ulaw(data, self._FRAME_SIZE) - for i, chunk in enumerate(chunks): - mark = i == 0 # set marker on the first packet of talk-spurt - packet = self._encoder.pack(chunk, mark=mark) - try: - await loop.sock_sendall(self._send_sock, packet) - except (OSError, socket.error) as exc: - logger.warning(f"RTP send failed (socket closed): {exc}") - break - except Exception as exc: - logger.error(f"RTP send failed: {exc}") - break - - def _chunk_ulaw(self, buf: bytes, size: int) -> list[bytes]: - """Split / aggregate μ-law bytes to exact *size* multiples. - - • If buf length is not a multiple of *size*, pad the last chunk with 0xFF - (silence). That keeps timestamps monotonic. - """ - if not buf: - return [] - if len(buf) % size: - pad = size - (len(buf) % size) - buf += b"\xff" * pad - return [buf[i : i + size] for i in range(0, len(buf), size)] - - # ─── properties ────────────────────────────────────────────── - - @property - def is_connected(self) -> bool: - """Check if client is connected.""" - return self._connection.is_connected() and not self._closing - - @property - def is_closing(self) -> bool: - """Check if client is closing.""" - return self._closing diff --git a/api/services/telephony/stasis_rtp_connection.py b/api/services/telephony/stasis_rtp_connection.py deleted file mode 100644 index a592f35..0000000 --- a/api/services/telephony/stasis_rtp_connection.py +++ /dev/null @@ -1,191 +0,0 @@ -"""Stasis RTP connection for worker processes - is used by stasis rtp transport. - -This connection works without direct ARI access and communicates with -the ARI Manager via Redis for all control operations. -""" - -from typing import Optional, Tuple - -import redis.asyncio as aioredis -from loguru import logger - -from api.services.telephony.stasis_event_protocol import ( - DisconnectCommand, - RedisChannels, - SocketClosedCommand, - TransferCommand, -) -from pipecat.utils.base_object import BaseObject - - -class StasisRTPConnection(BaseObject): - """Worker-side connection that communicates with ARI Manager via Redis. - - This class provides the same API as the original StasisRTPConnection but - without direct ARI client access. All channel operations are delegated - to the ARI Manager process via Redis. - """ - - _SUPPORTED_EVENTS = [ - "connecting", - "connected", - "disconnected", - "closed", - "failed", - "new", - ] - - def __init__( - self, - redis_client: aioredis.Redis, - channel_id: str, - caller_channel_id: str, - em_channel_id: Optional[str], - bridge_id: Optional[str], - local_addr: Optional[Tuple[str, int]], - remote_addr: Optional[Tuple[str, int]], - workflow_run_id: Optional[int] = None, - ): - """Initialize distributed connection with pre-established details. - - Args: - redis_client: Redis client for communication - channel_id: Primary channel ID for this connection - caller_channel_id: Caller's channel ID - em_channel_id: External media channel ID - bridge_id: Bridge ID (already created by ARI Manager) - local_addr: Local RTP address (host, port) - remote_addr: Remote RTP address with UNICASTRTP_LOCAL_PORT - workflow_run_id: Workflow run ID for logging context - """ - super().__init__() - - self.redis = redis_client - self.channel_id = channel_id - self.caller_channel_id = caller_channel_id - self.em_channel_id = em_channel_id - self.bridge_id = bridge_id - self.workflow_run_id = workflow_run_id - - # RTP addressing (same as StasisRTPConnection) - self.local_addr = local_addr - self.remote_addr = remote_addr - - # State tracking - # self._closed_by_stasis_end should only be set True after we get - # StasisEnd from the transport - self._closed_by_stasis_end = False - - # self._closing should be True if we have received disconnect - # or transfer request - self._closing = False - - self._connect_invoked = False - - # Register event handlers - for evt in self._SUPPORTED_EVENTS: - self._register_event_handler(evt) - - logger.debug( - f"channelID: {channel_id} StasisRTPConnection created: " - f"bridgeID: {bridge_id}, local_addr={local_addr}, remote_addr={remote_addr}" - ) - - async def connect(self): - """Signal readiness to start the call. - - Since the bridge is already established by ARI Manager, - we can immediately trigger the connected event. - """ - self._connect_invoked = True - if self.is_connected(): - await self._call_event_handler("connected") - else: - logger.warning( - "StasisRTPConnection is not connected - did not call connected handler" - ) - - async def disconnect(self): - """Request disconnection via Redis command to ARI Manager. Usually called - when there is a disconnect triggered by workflow""" - # If we have already received user hangup via StasisEnd, lets - # return - if self._closed_by_stasis_end or self._closing: - return - - self._closing = True - - logger.info(f"channelID: {self.channel_id} Requesting disconnect") - - # Send disconnect command to ARI Manager - command = DisconnectCommand(channel_id=self.channel_id) - channel = RedisChannels.channel_commands(self.channel_id) - await self.redis.publish(channel, command.to_json()) - - async def transfer(self, call_transfer_context: dict): - """Request call transfer via Redis command to ARI Manager.""" - # If we have already received user hangup via StasisEnd, lets - # return - if self._closed_by_stasis_end or self._closing: - return - - self._closing = True - - logger.info(f"channelID: {self.channel_id} Requesting transfer") - - # Send transfer command to ARI Manager - command = TransferCommand( - channel_id=self.channel_id, context=call_transfer_context - ) - channel = RedisChannels.channel_commands(self.channel_id) - await self.redis.publish(channel, command.to_json()) - - async def notify_sockets_closed(self): - """Notify ARI Manager that RTP sockets have been closed.""" - logger.info( - f"channelID: {self.channel_id} Notifying ARI Manager that sockets are closed" - ) - - # Send socket_closed command to ARI Manager - command = SocketClosedCommand(channel_id=self.channel_id) - channel = RedisChannels.channel_commands(self.channel_id) - await self.redis.publish(channel, command.to_json()) - - def is_connected(self) -> bool: - """Check if connection is established. - - Returns True once connect() has been called and connection is not closed. - """ - return ( - self._connect_invoked - and not self._closed_by_stasis_end - and not self._closing - ) - - async def handle_remote_disconnect(self): - """Handle disconnection initiated by ARI Manager. Is called when the user hangs up.""" - if self._closed_by_stasis_end or self._closing: - return - - self._closed_by_stasis_end = True - - if self._connect_invoked: - # Unless self._connect_invoked is True, the event handlers won't be registered. We only - # register the event handler of client when the transports are initiated during pipeline - # initialisation. Any caller must check and wait for _connect_invoked before - # calling the method - await self._call_event_handler("disconnected") - else: - logger.warning( - f"ChannelID: {self.channel_id} Got remote disconnect before connection was invoked" - ) - - logger.info(f"channelID: {self.channel_id} StasisRTPConnection disconnected") - - def __repr__(self): - """String representation of connection.""" - return ( - f"" - ) diff --git a/api/services/telephony/stasis_rtp_serializer.py b/api/services/telephony/stasis_rtp_serializer.py deleted file mode 100644 index c4caf02..0000000 --- a/api/services/telephony/stasis_rtp_serializer.py +++ /dev/null @@ -1,116 +0,0 @@ -# Copyright (c) 2024–2025, Daily -# -# SPDX-License-Identifier: BSD 2-Clause License -"""Stasis RTP frame serializer. - -This serializer converts between Pipecat frames and the raw μ-law RTP payload -stream expected by an Stasis *External Media* channel. - -The serializer: - -* Down-samples PCM to 8-kHz μ-law for **outgoing** audio (:class:`AudioRawFrame`). -* Up-samples μ-law to the pipeline's native rate for **incoming** audio. -""" - -from typing import Optional - -from loguru import logger -from pydantic import BaseModel - -from pipecat.audio.utils import create_default_resampler, pcm_to_ulaw, ulaw_to_pcm -from pipecat.frames.frames import ( - AudioRawFrame, - Frame, - InputAudioRawFrame, - StartFrame, -) -from pipecat.serializers.base_serializer import FrameSerializer - - -class StasisRTPFrameSerializer(FrameSerializer): - """Serializer for Asterisk External Media streams (raw μ-law).""" - - class InputParams(BaseModel): - """Configuration parameters. - - Attributes: - ---------- - stasis_sample_rate : int, default 8000 - The sample-rate used by Stasis when sending μ-law (PCMU). - sample_rate : Optional[int] - Override for the pipeline's *input* sample-rate. When omitted the - value from the :class:`StartFrame` is used. - """ - - stasis_sample_rate: int = 8000 - sample_rate: Optional[int] = None - - def __init__(self, params: Optional[InputParams] = None): - """Initialize Stasis RTP frame serializer. - - Args: - params: Optional configuration parameters for the serializer. - """ - self._params = params or self.InputParams() - - # Wire / pipeline rates - self._stasis_sample_rate = self._params.stasis_sample_rate - self._sample_rate = 0 # pipeline rate, filled in *setup* - - # Resampler shared between encode / decode paths - self._resampler = create_default_resampler() - - async def setup(self, frame: StartFrame): - """Remember pipeline configuration.""" - self._sample_rate = self._params.sample_rate or frame.audio_in_sample_rate - - async def serialize(self, frame: Frame) -> bytes | str | None: - """Convert a Pipecat frame to a wire payload. - - Only :class:`AudioRawFrame` instances are translated all other frame - types are silently ignored, allowing higher-level transports to deal - with them as needed. - """ - if isinstance(frame, AudioRawFrame): - try: - # Pipeline PCM → 8-kHz μ-law - encoded = await pcm_to_ulaw( - frame.audio, - frame.sample_rate, - self._stasis_sample_rate, - self._resampler, - ) - return encoded # raw bytes - except Exception as exc: # pragma: no cover – robustness - logger.error( - f"StasisRTPFrameSerializer.serialize: encode failed: {exc}" - ) - return None - - # Non-audio frames are not transmitted on the media path - return None - - async def deserialize(self, data: bytes | str) -> Frame | None: - """Convert wire payloads to Pipecat frames. - - The Stasis media socket delivers bare μ-law bytes, therefore *data* - must be *bytes*. Any *str* is ignored. - """ - if not isinstance(data, (bytes, bytearray)): - return None - - try: - pcm = await ulaw_to_pcm( - bytes(data), - self._stasis_sample_rate, - self._sample_rate, - self._resampler, - ) - return InputAudioRawFrame( - audio=pcm, - sample_rate=self._sample_rate, - num_channels=1, - ) - except Exception as exc: # pragma: no cover - logger.error(f"StasisRTPFrameSerializer.deserialize: decode failed: {exc}") - return None diff --git a/api/services/telephony/stasis_rtp_transport.py b/api/services/telephony/stasis_rtp_transport.py deleted file mode 100644 index 20a6b64..0000000 --- a/api/services/telephony/stasis_rtp_transport.py +++ /dev/null @@ -1,300 +0,0 @@ -# transports/ari_external_media.py (new file) - -"""Stasis RTP transport for Asterisk External Media integration.""" - -import asyncio -import time -from typing import Awaitable, Callable, Optional - -from loguru import logger -from pydantic import BaseModel - -from api.services.telephony.stasis_rtp_client import StasisRTPClient -from api.services.telephony.stasis_rtp_connection import StasisRTPConnection -from pipecat.frames.frames import ( - CancelFrame, - EndFrame, - InputAudioRawFrame, - OutputAudioRawFrame, - StartFrame, - TransportMessageFrame, - TransportMessageUrgentFrame, -) -from pipecat.serializers.base_serializer import FrameSerializer -from pipecat.transports.base_input import BaseInputTransport -from pipecat.transports.base_output import BaseOutputTransport -from pipecat.transports.base_transport import BaseTransport, TransportParams - - -class StasisRTPTransportParams(TransportParams): - """Transport parameters for Stasis RTP transport.""" - - serializer: FrameSerializer - - -class StasisRTPCallbacks(BaseModel): - """Callbacks for Stasis RTP transport events.""" - - on_client_connected: Callable[[str], Awaitable[None]] - on_client_disconnected: Callable[[str], Awaitable[None]] - on_client_closed: Callable[[str], Awaitable[None]] - - -# ------------------------------------------------ Input Transport ------------------------- - -""" -Transport calls client receive to receive the audio from the socket. This happens in the self._receive_audio task. -Then the audio frames are pushed to _audio_in_queue using push_audio_frame method. Then the _audio_task_handler processes -the frames from the _audio_in_queue and pushes them to the VAD analyzer, turn analyzer and pushes the audio -further downstream to tts. - -The BaseInputTransport pipeline is responsible for: -- Resampling the audio to the correct sample rate -- Applying the audio filter -- Pushing the audio frames to the VAD analyzer -- Pushing the audio frames to the turn analyzer -- Pushing the audio frames to the bot interruption analyzer -- Pushing the audio frames down the pipeline to the tts - -stop method is called from process_frame of the BaseInputTransport. super.stop() stops _audio_task_handler. It then -calls _client.disconnect. Transport's callbacks are sent to the client using StasisRTPCallbacks. -""" - - -class StasisRTPInputTransport(BaseInputTransport): - """Input transport for receiving audio over Stasis RTP.""" - - def __init__( - self, - transport: BaseTransport, - client: StasisRTPClient, - params: StasisRTPTransportParams, - **kwargs, - ): - """Initialize Stasis RTP input transport. - - Args: - transport: Parent transport instance. - client: Stasis RTP client for socket communication. - params: Transport parameters including serializer. - **kwargs: Additional keyword arguments for BaseInputTransport. - """ - super().__init__(params, **kwargs) - self._transport = transport - self._client = client - self._params = params - - self._receive_task: Optional[asyncio.Task] = None - - async def start(self, frame: StartFrame): - """Start the input transport.""" - await super().start(frame) - - await self._client.setup(frame) - await self._params.serializer.setup(frame) - - # Ensure underlying connection is established and socket ready. - await self._client.connect() - - if not self._receive_task: - self._receive_task = self.create_task(self._receive_audio()) - - await self.set_transport_ready(frame) - - async def _stop_tasks(self): - if self._receive_task: - await self.cancel_task(self._receive_task) - self._receive_task = None - - async def stop(self, frame: EndFrame): - """Stop the input transport.""" - await super().stop(frame) - await self._stop_tasks() - await self._client.disconnect() - logger.debug("Successfully disconnected from StasisRTPClient") - - async def cancel(self, frame: CancelFrame): - """Cancel the input transport.""" - await super().cancel(frame) - await self._stop_tasks() - await self._client.disconnect() - - async def _receive_audio(self): - try: - async for payload in self._client.receive(): - frame = await self._params.serializer.deserialize(payload) - if not frame: - continue - - if isinstance(frame, InputAudioRawFrame): - await self.push_audio_frame(frame) - else: - await self.push_frame(frame) - except Exception as exc: - logger.error(f"StasisRTPInputTransport receive error: {exc}") - - # No app-messages in RTP path, but keep compatibility - async def push_app_message(self, message): - """Push app message (not supported in RTP transport).""" - logger.debug("StasisRTPInputTransport received app message ignored (RTP only)") - - -# ------------------------------------------------ Output Transport ------------------------ - - -class StasisRTPOutputTransport(BaseOutputTransport): - """Output transport for sending audio over Stasis RTP.""" - - def __init__( - self, - transport: BaseTransport, - client: StasisRTPClient, - params: StasisRTPTransportParams, - **kwargs, - ): - """Initialize Stasis RTP output transport. - - Args: - transport: Parent transport instance. - client: Stasis RTP client for socket communication. - params: Transport parameters including serializer. - **kwargs: Additional keyword arguments for BaseOutputTransport. - """ - super().__init__(params, **kwargs) - - self._transport = transport - self._client = client - self._params = params - - # Pace outgoing audio so we don't dump buffers instantly (simulate 10-ms chunks) - self._send_interval: float = 0 - self._next_send_time: float = 0 - - async def start(self, frame: StartFrame): - """Start the output transport.""" - await super().start(frame) - - await self._client.setup(frame) - await self._params.serializer.setup(frame) - - self._send_interval = self._params.audio_out_10ms_chunks * 10 / 1000 # ms - - await self.set_transport_ready(frame) - - async def stop(self, frame: EndFrame): - """Stop the output transport.""" - await super().stop(frame) - await self._client.disconnect() - - async def cancel(self, frame: CancelFrame): - """Cancel the output transport.""" - await super().cancel(frame) - await self._client.disconnect() - - async def send_message( - self, frame: TransportMessageFrame | TransportMessageUrgentFrame - ): - """Send message frame (not supported in RTP transport).""" - # RTP path has no generic message channel; ignore. - pass - - async def write_audio_frame(self, frame: OutputAudioRawFrame): - """Write audio frame to RTP stream.""" - if self._client.is_closing: - return False - - if not self._client.is_connected: - # If not connected yet, just simulate playback delay. - await self._write_audio_sleep() - return - - payload = await self._params.serializer.serialize(frame) - if payload: - await self._client.send(payload) - - await self._write_audio_sleep() - - async def _write_audio_sleep(self): - """Simulates real-time audio playback timing by introducing controlled delays. - - This method implements a clock simulation to pace audio transmission at realistic - intervals. Without this pacing, audio frames would be sent as fast as possible, - which could overwhelm receivers or cause buffering issues. - - The method: - 1. Calculates how long to sleep based on when the next frame should be sent - 2. Sleeps for the calculated duration (or 0 if we're already behind schedule) - 3. Updates _next_send_time for the next audio chunk - - The _send_interval is computed as: (audio_chunk_size / sample_rate) / 2 - This creates timing that simulates how an actual audio device would output - audio at the proper rate (e.g., every 10ms for 10ms audio chunks). - """ - current_time = time.monotonic() - sleep_duration = max(0, self._next_send_time - current_time) - await asyncio.sleep(sleep_duration) - if sleep_duration == 0: - self._next_send_time = time.monotonic() + self._send_interval - else: - self._next_send_time += self._send_interval - - -class StasisRTPTransport(BaseTransport): - """Main transport class for Stasis RTP communication.""" - - def __init__( - self, - stasis_connection: StasisRTPConnection, - params: StasisRTPTransportParams, - input_name: Optional[str] = None, - output_name: Optional[str] = None, - ): - """Initialize Stasis RTP transport. - - Args: - stasis_connection: Connection parameters for Stasis RTP. - params: Transport parameters including serializer. - input_name: Optional name for input transport. - output_name: Optional name for output transport. - """ - super().__init__(input_name=input_name, output_name=output_name) - - self._params = params - - client_callbacks = StasisRTPCallbacks( - on_client_connected=self._on_client_connected, - on_client_disconnected=self._on_client_disconnected, - on_client_closed=self._on_client_closed, - ) - self._client = StasisRTPClient(stasis_connection, client_callbacks) - - self._input = StasisRTPInputTransport( - self, self._client, self._params, name=self._input_name - ) - - self._output = StasisRTPOutputTransport( - self, self._client, self._params, name=self._output_name - ) - - # expose handlers - self._register_event_handler("on_client_connected") - self._register_event_handler("on_client_disconnected") - self._register_event_handler("on_client_closed") - - def input(self) -> StasisRTPInputTransport: - """Get the input transport.""" - return self._input - - def output(self) -> StasisRTPOutputTransport: - """Get the output transport.""" - return self._output - - # ------------------------------------------------ event adapters ---------- - async def _on_client_connected(self, chan_id: str): - await self._call_event_handler("on_client_connected", chan_id) - - async def _on_client_disconnected(self, chan_id: str): - await self._call_event_handler("on_client_disconnected", chan_id) - - async def _on_client_closed(self, chan_id: str): - await self._call_event_handler("on_client_closed", chan_id) diff --git a/api/services/telephony/test_asyncari_ping.py b/api/services/telephony/test_asyncari_ping.py deleted file mode 100644 index 8e97371..0000000 --- a/api/services/telephony/test_asyncari_ping.py +++ /dev/null @@ -1,105 +0,0 @@ -#!/usr/bin/env python3 -"""Test script to verify asyncari ping functionality.""" - -import asyncio -import os -import sys -from pathlib import Path - -# Add the asyncari src to Python path for testing -asyncari_path = Path(__file__).parent.parent.parent.parent.parent / "asyncari" / "src" -sys.path.insert(0, str(asyncari_path)) - -import asyncari -from loguru import logger - - -async def test_ping(): - """Test the ping functionality with asyncari.""" - - # Configure from environment or use defaults - base_url = os.getenv("ARI_STASIS_ENDPOINT", "http://localhost:8088") - username = os.getenv("ARI_STASIS_USER", "asterisk") - password = os.getenv("ARI_STASIS_USER_PASSWORD", "asterisk") - apps = os.getenv("ARI_STASIS_APP_NAME", "test-app") - - logger.info(f"Connecting to ARI at {base_url}") - - try: - async with asyncari.connect( - base_url=base_url, apps=apps, username=username, password=password - ) as client: - logger.info("Connected to ARI") - - # Test REST API ping - logger.info("Testing REST API ping...") - result = await client.asterisk.ping() - logger.info(f"REST API ping successful: {result}") - - # Test WebSocket ping (should work with our wrapper) - logger.info("Testing WebSocket ping...") - for ws in client.websockets: - try: - await ws.ping() - logger.info("WebSocket ping() called successfully (no-op)") - except AttributeError: - logger.error("WebSocket doesn't have ping() method") - except Exception as e: - logger.error(f"WebSocket ping failed: {e}") - - # Test the keep_alive function - from ari_client_manager import keep_alive - - logger.info("Starting keep_alive task...") - keep_alive_task = asyncio.create_task(keep_alive(client, interval=5.0)) - - # Run for 20 seconds to see several pings - await asyncio.sleep(20) - - # Cancel keep_alive - keep_alive_task.cancel() - try: - await keep_alive_task - except asyncio.CancelledError: - logger.info("keep_alive task cancelled") - - logger.info("Test completed successfully!") - - except Exception as e: - logger.exception(f"Test failed: {e}") - return False - - return True - - -async def test_with_manager(): - """Test using the ARI client manager.""" - from ari_client_manager import setup_ari_client_supervisor - - async def on_stasis_call(client, channel, context_vars): - logger.info(f"Received call: {channel.id}") - - # Enable ARI Stasis for testing - os.environ["ENABLE_ARI_STASIS"] = "true" - - supervisor = await setup_ari_client_supervisor(on_stasis_call) - - if supervisor: - logger.info("ARI Stasis supervisor started with ping support") - - # Run for 30 seconds - await asyncio.sleep(30) - - await supervisor.close() - logger.info("Supervisor closed") - else: - logger.error("Failed to start supervisor") - - -if __name__ == "__main__": - import sys - - if len(sys.argv) > 1 and sys.argv[1] == "manager": - asyncio.run(test_with_manager()) - else: - asyncio.run(test_ping()) diff --git a/api/services/telephony/test_real_ping.py b/api/services/telephony/test_real_ping.py deleted file mode 100644 index 02236ba..0000000 --- a/api/services/telephony/test_real_ping.py +++ /dev/null @@ -1,83 +0,0 @@ -#!/usr/bin/env python3 -"""Test script to verify real WebSocket ping frames are being sent.""" - -import asyncio -import os -import sys -from pathlib import Path - -# Add the asyncari src to Python path -asyncari_path = Path(__file__).parent.parent.parent.parent.parent / "asyncari" / "src" -sys.path.insert(0, str(asyncari_path)) - -import asyncari -from loguru import logger - -# Enable debug logging to see ping frames -logger.add(sys.stderr, level="DEBUG") - - -async def test_real_ping(): - """Test that real WebSocket ping frames are sent.""" - - # Configure from environment or use defaults - base_url = os.getenv("ARI_STASIS_ENDPOINT", "http://localhost:8088") - username = os.getenv("ARI_STASIS_USER", "asterisk") - password = os.getenv("ARI_STASIS_USER_PASSWORD", "asterisk") - apps = os.getenv("ARI_STASIS_APP_NAME", "test-app") - - logger.info(f"Connecting to ARI at {base_url}") - - try: - async with asyncari.connect( - base_url=base_url, apps=apps, username=username, password=password - ) as client: - logger.info("Connected to ARI") - - # Get the WebSocket - for ws in client.websockets: - logger.info(f"WebSocket type: {type(ws)}") - logger.info( - f"WebSocket wrapper active: {'WebSocketWrapper' in str(type(ws))}" - ) - - # Check internal structure - if hasattr(ws, "_websocket"): - inner_ws = ws._websocket - logger.info(f"Inner WebSocket type: {type(inner_ws)}") - logger.info(f"Has _connection: {hasattr(inner_ws, '_connection')}") - logger.info(f"Has _sock: {hasattr(inner_ws, '_sock')}") - - # Send a test ping - logger.info("Sending test ping...") - try: - await ws.ping(b"test-ping-123") - logger.info("Ping sent successfully!") - except Exception as e: - logger.error(f"Ping failed: {e}") - - # Test the keep_alive function - logger.info("\nTesting keep_alive function...") - from ari_client_manager import keep_alive - - # Run keep_alive for a short time - keep_alive_task = asyncio.create_task(keep_alive(client, interval=3.0)) - - # Let it run for 10 seconds to see multiple pings - await asyncio.sleep(10) - - # Cancel and cleanup - keep_alive_task.cancel() - try: - await keep_alive_task - except asyncio.CancelledError: - pass - - logger.info("Test completed!") - - except Exception as e: - logger.exception(f"Test failed: {e}") - - -if __name__ == "__main__": - asyncio.run(test_real_ping()) diff --git a/api/services/telephony/worker_event_subscriber.py b/api/services/telephony/worker_event_subscriber.py deleted file mode 100644 index 6126372..0000000 --- a/api/services/telephony/worker_event_subscriber.py +++ /dev/null @@ -1,371 +0,0 @@ -"""Worker Event Subscriber for distributed ARI architecture. - -This component runs in each FastAPI worker process and subscribes to -Redis events from the ARI Manager. It creates pipelines for assigned calls -without any direct ARI connection. -""" - -import asyncio -import json -import uuid -from typing import Awaitable, Callable, Optional - -import redis.asyncio as aioredis -from loguru import logger - -from api.routes.stasis_rtp import on_stasis_call -from api.services.telephony.stasis_event_protocol import ( - DisconnectCommand, - RedisChannels, - RedisKeys, - StasisEndEvent, - StasisStartEvent, - parse_event, -) -from api.services.telephony.stasis_rtp_connection import StasisRTPConnection -from pipecat.utils.run_context import set_current_run_id - - -class WorkerEventSubscriber: - """Subscribes to ARI events from Redis and processes them in the worker.""" - - def __init__( - self, - redis_client: aioredis.Redis, - on_stasis_call: Callable[[StasisRTPConnection, dict], Awaitable[None]], - ): - self.redis = redis_client - self.worker_id = str(uuid.uuid4()) # Generate unique worker ID - self.on_stasis_call = on_stasis_call - self._running = False - self._task: Optional[asyncio.Task] = None - self._heartbeat_task: Optional[asyncio.Task] = None - self._active_connections: dict[str, StasisRTPConnection] = {} - self._active_tasks: dict[str, asyncio.Task] = {} - self._cleanup_tasks: dict[str, asyncio.Task] = {} - self._shutting_down = False - self._shutdown_event = asyncio.Event() - - async def start(self): - """Start the event subscriber.""" - if self._task is None: - self._running = True - - # Register worker in Redis - await self._register_worker() - - # Start main event loop - self._task = asyncio.create_task( - self._run(), name=f"worker_subscriber_{self.worker_id}" - ) - - # Start heartbeat task - self._heartbeat_task = asyncio.create_task( - self._heartbeat_loop(), name=f"worker_heartbeat_{self.worker_id}" - ) - - logger.info(f"Worker {self.worker_id} event subscriber started") - - async def _register_worker(self): - """Register this worker in Redis.""" - worker_key = RedisKeys.worker_active(self.worker_id) - worker_data = json.dumps({"status": "ready", "active_calls": 0}) - - # Set with TTL of 30 seconds (will be refreshed by heartbeat) - await self.redis.setex(worker_key, 30, worker_data) - - # Add to workers set - await self.redis.sadd(RedisKeys.workers_set(), self.worker_id) - - logger.info(f"Worker {self.worker_id} registered in Redis") - - async def _heartbeat_loop(self): - """Send periodic heartbeats to Redis.""" - try: - while self._running: - # Update worker status with current active call count - worker_key = RedisKeys.worker_active(self.worker_id) - worker_data = json.dumps( - { - "status": "draining" if self._shutting_down else "ready", - "active_calls": len(self._active_tasks), - } - ) - - # Refresh TTL to 30 seconds - await self.redis.setex(worker_key, 30, worker_data) - - # Wait 10 seconds before next heartbeat - await asyncio.sleep(10) - - except asyncio.CancelledError: - logger.debug(f"Worker {self.worker_id} heartbeat cancelled") - except Exception as e: - logger.exception(f"Worker {self.worker_id} heartbeat error: {e}") - - async def graceful_shutdown(self, max_wait_seconds: int = 300): - """Gracefully shutdown the worker, waiting for calls to complete. - - Args: - max_wait_seconds: Maximum time to wait for calls to complete (default 5 minutes) - """ - logger.info(f"Worker {self.worker_id} starting graceful shutdown") - - # Mark as shutting down to prevent new calls - self._shutting_down = True - - # Update status in Redis to 'draining' - worker_key = RedisKeys.worker_active(self.worker_id) - worker_data = json.dumps( - {"status": "draining", "active_calls": len(self._active_tasks)} - ) - await self.redis.setex(worker_key, 30, worker_data) - - # Wait for active tasks to complete (with timeout) - start_time = asyncio.get_event_loop().time() - while ( - self._active_tasks - and (asyncio.get_event_loop().time() - start_time) < max_wait_seconds - ): - active_count = len(self._active_tasks) - logger.info( - f"Worker {self.worker_id} waiting for {active_count} active calls to complete" - ) - - # Update Redis with current status - worker_data = json.dumps( - {"status": "draining", "active_calls": active_count} - ) - await self.redis.setex(worker_key, 30, worker_data) - - # Wait a bit before checking again - await asyncio.sleep(5) - - # Force stop if timeout reached - if self._active_tasks: - logger.warning( - f"Worker {self.worker_id} forcefully stopping {len(self._active_tasks)} active calls after timeout channel_ids: {list(self._active_tasks.keys())}" - ) - - await self.stop() - - async def stop(self): - """Stop the event subscriber and deregister from Redis.""" - self._running = False - - # Deregister from Redis - await self._deregister_worker() - - # Cancel all active call processing tasks - for channel_id, task in list(self._active_tasks.items()): - if not task.done(): - logger.info(f"Cancelling active call task for channel {channel_id}") - task.cancel() - - # Cancel all cleanup tasks - for channel_id, task in list(self._cleanup_tasks.items()): - if not task.done(): - logger.info(f"Cancelling cleanup task for channel {channel_id}") - task.cancel() - - # Wait for all tasks to complete - all_tasks = list(self._active_tasks.values()) + list( - self._cleanup_tasks.values() - ) - if all_tasks: - await asyncio.gather(*all_tasks, return_exceptions=True) - - # Cancel heartbeat task - if self._heartbeat_task: - self._heartbeat_task.cancel() - try: - await self._heartbeat_task - except asyncio.CancelledError: - pass - - if self._task: - self._task.cancel() - try: - await self._task - except asyncio.CancelledError: - pass - - logger.info(f"Worker {self.worker_id} event subscriber stopped") - - async def _deregister_worker(self): - """Remove this worker from Redis.""" - try: - # Remove from active workers - await self.redis.delete(RedisKeys.worker_active(self.worker_id)) - - # Remove from workers set - await self.redis.srem(RedisKeys.workers_set(), self.worker_id) - - logger.info(f"Worker {self.worker_id} deregistered from Redis") - except Exception as e: - logger.error(f"Error deregistering worker {self.worker_id}: {e}") - - async def _run(self): - """Main subscriber loop.""" - self._running = True - channel = RedisChannels.worker_events(self.worker_id) - pubsub = self.redis.pubsub() - - try: - await pubsub.subscribe(channel) - logger.info(f"Worker {self.worker_id} subscribed to {channel}") - - async for message in pubsub.listen(): - if not self._running: - break - - if message["type"] == "message": - try: - await self._handle_event(message["data"]) - except Exception as e: - logger.exception(f"Error handling event: {e}") - - except asyncio.CancelledError: - logger.debug(f"Worker {self.worker_id} subscriber cancelled") - except Exception as e: - logger.exception(f"Worker {self.worker_id} subscriber error: {e}") - finally: - await pubsub.unsubscribe(channel) - await pubsub.aclose() - - async def _handle_event(self, data: str): - """Handle an event from the ARI Manager.""" - event = parse_event(data) - if not event: - logger.warning(f"Failed to parse event: {data}") - return - - if isinstance(event, StasisStartEvent): - await self._handle_stasis_start(event) - elif isinstance(event, StasisEndEvent): - await self._handle_stasis_end(event) - else: - logger.warning( - f"channelID: {event.channel_id} Unhandled event type: {type(event)}" - ) - - async def _handle_stasis_start(self, event: StasisStartEvent): - """Handle a new call assignment.""" - - channel_id = event.channel_id - logger.info( - f"channelID: {channel_id} Worker {self.worker_id} handling StasisStart" - ) - - try: - # Create StasisRTPConnection without ARI client - connection = StasisRTPConnection( - redis_client=self.redis, - channel_id=channel_id, - caller_channel_id=event.caller_channel_id, - em_channel_id=event.em_channel_id, - bridge_id=event.bridge_id, - local_addr=tuple(event.local_addr) if event.local_addr else None, - remote_addr=tuple(event.remote_addr) if event.remote_addr else None, - ) - - # Store connection for cleanup - self._active_connections[channel_id] = connection - - # Create a background task to handle the call - task = asyncio.create_task( - self._process_call(connection, event.call_context_vars, channel_id), - name=f"call_handler_{channel_id}", - ) - self._active_tasks[channel_id] = task - - except Exception as e: - logger.exception(f"Error handling StasisStart for {channel_id}: {e}") - # Send disconnect command if setup fails - await self._send_disconnect(channel_id, "setup_failed") - - async def _process_call( - self, connection: StasisRTPConnection, call_context_vars: dict, channel_id: str - ): - """Process a call in the background.""" - try: - await self.on_stasis_call(connection, call_context_vars) - except Exception as e: - logger.exception(f"Error processing call for {channel_id}: {e}") - # Send disconnect command if call processing fails - await self._send_disconnect(channel_id, "processing_failed") - finally: - # Clean up task reference - if channel_id in self._active_tasks: - del self._active_tasks[channel_id] - - async def _process_cleanup(self, channel_id: str): - """Process call cleanup in the background.""" - try: - if channel_id in self._active_connections: - connection: StasisRTPConnection = self._active_connections[channel_id] - - # We must wait for the connection's invocation - # before sending in remote disconnect. Otherwise, - # the event handlers won't be registered and we won't - # be able to call on_client_disconnected to cancel the - # pipeline - while not connection._connect_invoked: - await asyncio.sleep(0.1) - - # Set the run_id context so that we can have it in logs - if connection.workflow_run_id: - set_current_run_id(connection.workflow_run_id) - - await connection.handle_remote_disconnect() - del self._active_connections[channel_id] - except Exception as e: - logger.exception(f"Error during cleanup for {channel_id}: {e}") - finally: - # Clean up task reference from cleanup tasks dictionary - if channel_id in self._cleanup_tasks: - del self._cleanup_tasks[channel_id] - - async def _handle_stasis_end(self, event: StasisEndEvent): - """Handle call termination.""" - channel_id = event.channel_id - logger.info( - f"channelID: {channel_id} Worker {self.worker_id} handling StasisEnd" - ) - - # Create a background task to handle the cleanup - if channel_id in self._active_connections: - # Check if there's already a cleanup task for this channel - if ( - channel_id not in self._cleanup_tasks - or self._cleanup_tasks[channel_id].done() - ): - # Lets start a new task, since we need to poll for - # connection to be invoked from the pipeline before - # caling remote disconnect - task = asyncio.create_task( - self._process_cleanup(channel_id), - name=f"cleanup_handler_{channel_id}", - ) - self._cleanup_tasks[channel_id] = task - else: - logger.warning( - f"channelID: {channel_id} Cleanup skipped - cleanup task still running" - ) - - async def _send_disconnect(self, channel_id: str, reason: str): - """Send disconnect command to ARI Manager.""" - - command = DisconnectCommand(channel_id=channel_id, reason=reason) - channel = RedisChannels.channel_commands(channel_id) - await self.redis.publish(channel, command.to_json()) - - -async def setup_worker_subscriber( - redis_client: aioredis.Redis, -) -> WorkerEventSubscriber: - """Setup the worker event subscriber with dynamic registration.""" - subscriber = WorkerEventSubscriber(redis_client, on_stasis_call) - logger.info(f"Setting up worker event subscriber with ID {subscriber.worker_id}") - await subscriber.start() - return subscriber diff --git a/api/services/workflow/pipecat_engine.py b/api/services/workflow/pipecat_engine.py index 56d24d2..cfee3cb 100644 --- a/api/services/workflow/pipecat_engine.py +++ b/api/services/workflow/pipecat_engine.py @@ -18,7 +18,6 @@ from pipecat.services.llm_service import FunctionCallParams from pipecat.utils.enums import EndTaskReason if TYPE_CHECKING: - from api.services.telephony.stasis_rtp_connection import StasisRTPConnection from pipecat.frames.frames import Frame from pipecat.services.anthropic.llm import AnthropicLLMService from pipecat.services.google.llm import GoogleLLMService @@ -83,9 +82,6 @@ class PipecatEngine: self._gathered_context: dict = {} self._user_response_timeout_task: Optional[asyncio.Task] = None - # Stasis connection for immediate transfers - self._stasis_connection: Optional["StasisRTPConnection"] = None - # Will be set later in initialize() when we have # access to _context self._variable_extraction_manager = None @@ -695,23 +691,6 @@ class PipecatEngine: """ self.task = task - def set_stasis_connection( - self, connection: Optional["StasisRTPConnection"] - ) -> None: - """Set the Stasis RTP connection for immediate transfers. - - This allows the engine to initiate transfers immediately when XFER - disposition is detected, without waiting for pipeline shutdown. - - Args: - connection: The StasisRTPConnection instance, or None for non-Stasis transports - """ - self._stasis_connection = connection - if connection: - logger.debug( - f"Stasis connection set for immediate transfers: {connection.channel_id}" - ) - def set_audio_config(self, audio_config) -> None: """Set the audio configuration for the pipeline.""" self._audio_config = audio_config diff --git a/pipecat b/pipecat index e5390c0..83d4397 160000 --- a/pipecat +++ b/pipecat @@ -1 +1 @@ -Subproject commit e5390c06c158d7051640e5e295c51f879ad143c3 +Subproject commit 83d43970a7539251233a118e6768ba70cef144cc diff --git a/ui/src/constants/workflowRunModes.ts b/ui/src/constants/workflowRunModes.ts index cfedee0..eed3518 100644 --- a/ui/src/constants/workflowRunModes.ts +++ b/ui/src/constants/workflowRunModes.ts @@ -7,7 +7,6 @@ export const WORKFLOW_RUN_MODES = { VONAGE: 'vonage', VOBIZ: 'vobiz', CLOUDONIX: 'cloudonix', - STASIS: 'stasis', WEBRTC: 'webrtc', SMALL_WEBRTC: 'smallwebrtc', } as const;