dograh/api/services/telephony/ari_client.py
Abhishek Kumar 4f2a629340 Initial Commit 🚀 🚀
2025-09-09 14:37:32 +05:30

765 lines
28 KiB
Python

"""
Dynamic ARI client that generates methods from Swagger/OpenAPI specification.
Pure asyncio implementation without anyio dependencies.
"""
import asyncio
import json
from dataclasses import dataclass, field
from typing import Callable, Dict, List, Optional
from urllib.parse import urljoin
import aiohttp
from loguru import logger
class SwaggerMethod:
"""Represents a Swagger API method."""
def __init__(
self, client: "AsyncARIClient", path: str, method: str, operation: dict
):
self.client = client
self.path = path
self.http_method = method.upper()
self.operation = operation
self.operation_id = operation.get("operationId", "")
self.parameters = operation.get("parameters", [])
self.description = operation.get("description", "")
def _build_path(self, **kwargs) -> str:
"""Build the actual path by substituting path parameters."""
path = self.path
# Replace path parameters like {channelId} with actual values
for param in self.parameters:
# Swagger spec uses 'paramType' not 'in'
if param.get("paramType", param.get("in")) == "path":
param_name = param["name"]
if param_name in kwargs:
path = path.replace(f"{{{param_name}}}", str(kwargs[param_name]))
return path
def _build_params(self, **kwargs) -> dict:
"""Extract query parameters from kwargs."""
params = {}
for param in self.parameters:
# Swagger spec uses 'paramType' not 'in'
if param.get("paramType", param.get("in")) == "query":
param_name = param["name"]
if param_name in kwargs:
params[param_name] = kwargs[param_name]
return params
def _build_body(self, **kwargs) -> dict:
"""Extract body parameters from kwargs."""
body = {}
for param in self.parameters:
# Swagger 1.2 uses 'paramType' = 'body' for body parameters
if param.get("paramType", param.get("in")) == "body":
param_name = param["name"]
if param_name in kwargs:
# In Swagger 1.2, body param is usually the whole body
return (
kwargs[param_name]
if isinstance(kwargs[param_name], dict)
else {param_name: kwargs[param_name]}
)
return body
async def __call__(self, **kwargs):
"""Execute the API method."""
path = self._build_path(**kwargs)
params = self._build_params(**kwargs)
# Check if there's a body parameter defined in the spec
body_data = self._build_body(**kwargs)
# If no body param in spec, use remaining kwargs for body (backward compat)
if not body_data:
# Remove path and query parameters from kwargs (leaving body params)
# Swagger spec uses 'paramType' not 'in'
path_param_names = {
p["name"]
for p in self.parameters
if p.get("paramType", p.get("in")) == "path"
}
query_param_names = {
p["name"]
for p in self.parameters
if p.get("paramType", p.get("in")) == "query"
}
body_param_names = {
p["name"]
for p in self.parameters
if p.get("paramType", p.get("in")) == "body"
}
body_data = {
k: v
for k, v in kwargs.items()
if k not in path_param_names
and k not in query_param_names
and k not in body_param_names
}
# Debug logging for externalMedia
if "externalMedia" in path:
logger.debug(
f"externalMedia call - method: {self.http_method}, path: {path}, params: {params}"
)
if self.http_method == "GET":
return await self.client.api_get(path, **params)
elif self.http_method == "POST":
return await self.client.api_post(
path, json_data=body_data if body_data else None, **params
)
elif self.http_method == "PUT":
return await self.client.api_put(
path, json_data=body_data if body_data else None, **params
)
elif self.http_method == "DELETE":
return await self.client.api_delete(path, **params)
else:
raise ValueError(f"Unsupported HTTP method: {self.http_method}")
class ResourceAPI:
"""Represents a resource API (like channels, bridges, etc.)."""
def __init__(self, client: "AsyncARIClient", resource_name: str):
self.client = client
self.resource_name = resource_name
self._methods = {}
def add_method(self, method_name: str, swagger_method: SwaggerMethod):
"""Add a method to this resource."""
self._methods[method_name] = swagger_method
def __getattr__(self, name):
"""Dynamically return methods."""
if name in self._methods:
return self._methods[name]
raise AttributeError(f"'{self.resource_name}' has no method '{name}'")
@dataclass
class Channel:
"""Channel model with dynamic method support."""
id: str
name: str = ""
state: str = ""
caller: Dict[str, str] = field(default_factory=dict)
connected: Dict[str, str] = field(default_factory=dict)
accountcode: str = ""
dialplan: Dict[str, str] = field(default_factory=dict)
creationtime: str = ""
language: str = "en"
# Store reference to client for method calls
_client: Optional["AsyncARIClient"] = field(default=None, repr=False)
@classmethod
def from_dict(cls, data: dict, client=None) -> "Channel":
"""Create Channel from API response."""
channel = cls(
id=data.get("id", ""),
name=data.get("name", ""),
state=data.get("state", ""),
caller=data.get("caller", {}),
connected=data.get("connected", {}),
accountcode=data.get("accountcode", ""),
dialplan=data.get("dialplan", {}),
creationtime=data.get("creationtime", ""),
language=data.get("language", "en"),
_client=client,
)
return channel
async def continueInDialplan(
self,
context: str = None,
extension: str = None,
priority: int = None,
label: str = None,
):
"""Continue channel in dialplan."""
if not self._client:
raise RuntimeError("Channel not associated with a client")
params = {"channelId": self.id}
if context:
params["context"] = context
if extension:
params["extension"] = extension
if priority is not None:
params["priority"] = priority
if label:
params["label"] = label
# The ARI API method is named 'continueInDialplan'
channels_api = self._client.channels
if hasattr(channels_api, "continueInDialplan"):
await channels_api.continueInDialplan(**params)
else:
# Fallback to direct API call
await self._client.api_post(f"/channels/{self.id}/continue", **params)
async def hangup(self, reason: str = "normal"):
"""Hangup the channel."""
if not self._client:
raise RuntimeError("Channel not associated with a client")
await self._client.channels.hangup(channelId=self.id, reason=reason)
async def answer(self):
"""Answer the channel."""
if not self._client:
raise RuntimeError("Channel not associated with a client")
await self._client.channels.answer(channelId=self.id)
async def getChannelVar(self, variable: str):
"""Get a channel variable."""
if not self._client:
raise RuntimeError("Channel not associated with a client")
return await self._client.channels.getChannelVar(
channelId=self.id, variable=variable
)
@dataclass
class Bridge:
"""Bridge model with dynamic method support."""
id: str
technology: str = ""
bridge_type: str = ""
bridge_class: str = ""
creator: str = ""
name: str = ""
channels: List[str] = field(default_factory=list)
_client: Optional["AsyncARIClient"] = field(default=None, repr=False)
@classmethod
def from_dict(cls, data: dict, client=None) -> "Bridge":
"""Create Bridge from API response."""
return cls(
id=data.get("id", ""),
technology=data.get("technology", ""),
bridge_type=data.get("bridge_type", ""),
bridge_class=data.get("bridge_class", ""),
creator=data.get("creator", ""),
name=data.get("name", ""),
channels=data.get("channels", []),
_client=client,
)
async def addChannel(self, channel: str):
"""Add channel to bridge."""
if not self._client:
raise RuntimeError("Bridge not associated with a client")
await self._client.bridges.addChannel(bridgeId=self.id, channel=channel)
async def removeChannel(self, channel: str):
"""Remove channel from bridge."""
if not self._client:
raise RuntimeError("Bridge not associated with a client")
await self._client.bridges.removeChannel(bridgeId=self.id, channel=channel)
async def destroy(self):
"""Destroy the bridge."""
if not self._client:
raise RuntimeError("Bridge not associated with a client")
await self._client.bridges.destroy(bridgeId=self.id)
class AsyncARIClient:
"""ARI client that dynamically generates methods from Swagger spec."""
def __init__(self, base_url: str, username: str, password: str, app: str):
self.base_url = base_url.rstrip("/")
self.username = username
self.password = password
self.app = app
# REST API URL
self.api_url = self.base_url.replace("ws://", "http://").replace(
"wss://", "https://"
)
# WebSocket URL
self.ws_url = (
f"{self.base_url}/ari/events?app={app}&api_key={username}:{password}"
)
# Session and WebSocket
self._session: Optional[aiohttp.ClientSession] = None
self._websocket: Optional[aiohttp.ClientWebSocketResponse] = None
self._running = False
# Event handling
self._event_handlers: Dict[str, List[Callable]] = {}
self._event_queue: asyncio.Queue = asyncio.Queue(maxsize=1000)
# Resource APIs (will be populated from Swagger)
self.channels: Optional[ResourceAPI] = None
self.bridges: Optional[ResourceAPI] = None
self.endpoints: Optional[ResourceAPI] = None
self.recordings: Optional[ResourceAPI] = None
self.sounds: Optional[ResourceAPI] = None
self.playbacks: Optional[ResourceAPI] = None
self.asterisk: Optional[ResourceAPI] = None
self.applications: Optional[ResourceAPI] = None
self.deviceStates: Optional[ResourceAPI] = None
self.mailboxes: Optional[ResourceAPI] = None
# Swagger spec cache
self._swagger_spec: Optional[dict] = None
async def connect(self):
"""Connect to ARI and load Swagger spec."""
# Create HTTP session
auth = aiohttp.BasicAuth(self.username, self.password)
self._session = aiohttp.ClientSession(auth=auth)
try:
# Load Swagger spec and generate methods
await self._load_swagger_spec()
# Connect WebSocket
self._websocket = await self._session.ws_connect(
self.ws_url, heartbeat=30, autoping=True
)
self._running = True
logger.info(f"Connected to ARI at {self.ws_url}")
except Exception as e:
await self._session.close()
raise Exception(f"Failed to connect to ARI: {e}")
async def _load_swagger_spec(self):
"""Load Swagger spec and generate dynamic methods."""
spec_loaded = False
try:
# Get Swagger spec from ARI
url = f"{self.api_url}/ari/api-docs/resources.json"
async with self._session.get(url) as resp:
resp.raise_for_status()
resources = await resp.json()
# Store the spec
self._swagger_spec = resources
# Create resource APIs
for api_info in resources.get("apis", []):
resource_path = api_info["path"]
# Fix the path - remove .{format} and add proper prefix
resource_path = resource_path.replace(".{format}", ".json")
# Load detailed spec for this resource
# The resource_path already contains /api-docs/, so we just need the base URL
url = f"{self.api_url}/ari{resource_path}"
try:
async with self._session.get(url) as resp:
resp.raise_for_status()
spec = await resp.json()
self._process_swagger_spec(spec)
spec_loaded = True
except Exception as e:
logger.warning(f"Failed to load spec for {resource_path}: {e}")
if spec_loaded:
logger.info("Loaded Swagger spec and generated dynamic methods")
else:
raise Exception("No individual specs could be loaded")
except Exception as e:
logger.warning(f"Failed to load Swagger spec, using fallback methods: {e}")
self._create_fallback_methods()
def _process_swagger_spec(self, spec: dict):
"""Process a Swagger spec and create dynamic methods."""
# basePath is available in spec but not currently used
for api in spec.get("apis", []):
path = api["path"]
for operation in api.get("operations", []):
self._create_method_from_operation(path, operation)
def _create_method_from_operation(self, path: str, operation: dict):
"""Create a method from a Swagger operation."""
# Swagger spec uses 'httpMethod' not 'method'
method = operation.get("httpMethod", operation.get("method", "GET"))
operation_id = operation.get("nickname", "")
if not operation_id:
return
# Determine resource from path (e.g., /channels/{channelId} -> channels)
path_parts = path.strip("/").split("/")
if path_parts:
resource_name = path_parts[0]
# Create resource API if it doesn't exist
if not hasattr(self, resource_name) or getattr(self, resource_name) is None:
setattr(self, resource_name, ResourceAPI(self, resource_name))
resource_api = getattr(self, resource_name)
# Extract method name from operation ID
# e.g., "channels_continue" -> "continue_"
# or "channels_get" -> "get"
method_name = operation_id
if method_name.startswith(resource_name + "_"):
method_name = method_name[len(resource_name) + 1 :]
# Handle special cases
if method_name == "continue":
method_name = "continue_" # Avoid Python keyword
# Create and add the method
swagger_method = SwaggerMethod(self, path, method, operation)
resource_api.add_method(method_name, swagger_method)
def _create_fallback_methods(self):
"""Create fallback methods if Swagger spec is not available."""
# Create basic resource APIs
self.channels = ResourceAPI(self, "channels")
self.bridges = ResourceAPI(self, "bridges")
# Add essential channel methods
self.channels.add_method(
"get",
SwaggerMethod(
self,
"/channels/{channelId}",
"GET",
{
"operationId": "get",
"parameters": [{"name": "channelId", "in": "path"}],
},
),
)
self.channels.add_method(
"hangup",
SwaggerMethod(
self,
"/channels/{channelId}",
"DELETE",
{
"operationId": "hangup",
"parameters": [
{"name": "channelId", "in": "path"},
{"name": "reason", "in": "query"},
],
},
),
)
self.channels.add_method(
"answer",
SwaggerMethod(
self,
"/channels/{channelId}/answer",
"POST",
{
"operationId": "answer",
"parameters": [{"name": "channelId", "in": "path"}],
},
),
)
self.channels.add_method(
"continueInDialplan",
SwaggerMethod(
self,
"/channels/{channelId}/continue",
"POST",
{
"operationId": "continueInDialplan",
"parameters": [
{"name": "channelId", "in": "path"},
{"name": "context", "in": "query"},
{"name": "extension", "in": "query"},
{"name": "priority", "in": "query"},
{"name": "label", "in": "query"},
],
},
),
)
self.channels.add_method(
"externalMedia",
SwaggerMethod(
self,
"/channels/externalMedia",
"POST",
{
"operationId": "externalMedia",
"parameters": [
{"name": "channelId", "in": "query"}, # Add channelId parameter
{"name": "app", "in": "query"},
{"name": "external_host", "in": "query"},
{"name": "format", "in": "query"},
{"name": "encapsulation", "in": "query"},
{"name": "transport", "in": "query"},
{"name": "connection_type", "in": "query"},
{"name": "direction", "in": "query"},
],
},
),
)
self.channels.add_method(
"getChannelVar",
SwaggerMethod(
self,
"/channels/{channelId}/variable",
"GET",
{
"operationId": "getChannelVar",
"parameters": [
{"name": "channelId", "in": "path"},
{"name": "variable", "in": "query"},
],
},
),
)
# Add essential bridge methods
self.bridges.add_method(
"get",
SwaggerMethod(
self,
"/bridges/{bridgeId}",
"GET",
{
"operationId": "get",
"parameters": [{"name": "bridgeId", "in": "path"}],
},
),
)
self.bridges.add_method(
"create",
SwaggerMethod(
self,
"/bridges",
"POST",
{
"operationId": "create",
"parameters": [
{"name": "type", "in": "query"},
{"name": "name", "in": "query"},
],
},
),
)
self.bridges.add_method(
"addChannel",
SwaggerMethod(
self,
"/bridges/{bridgeId}/addChannel",
"POST",
{
"operationId": "addChannel",
"parameters": [
{"name": "bridgeId", "in": "path"},
{"name": "channel", "in": "query"},
],
},
),
)
self.bridges.add_method(
"removeChannel",
SwaggerMethod(
self,
"/bridges/{bridgeId}/removeChannel",
"POST",
{
"operationId": "removeChannel",
"parameters": [
{"name": "bridgeId", "in": "path"},
{"name": "channel", "in": "query"},
],
},
),
)
self.bridges.add_method(
"destroy",
SwaggerMethod(
self,
"/bridges/{bridgeId}",
"DELETE",
{
"operationId": "destroy",
"parameters": [{"name": "bridgeId", "in": "path"}],
},
),
)
async def disconnect(self):
"""Disconnect from ARI."""
self._running = False
if self._websocket:
await self._websocket.close()
if self._session:
await self._session.close()
async def run(self):
"""Main event loop."""
if not self._websocket:
raise RuntimeError("Not connected")
processor_task = asyncio.create_task(self._process_events())
try:
async for msg in self._websocket:
if msg.type == aiohttp.WSMsgType.TEXT:
try:
event = json.loads(msg.data)
# Wrap channel/bridge objects
if "channel" in event and isinstance(event["channel"], dict):
event["channel"] = Channel.from_dict(event["channel"], self)
if "bridge" in event and isinstance(event["bridge"], dict):
event["bridge"] = Bridge.from_dict(event["bridge"], self)
await self._event_queue.put(event)
except json.JSONDecodeError:
logger.error(f"Invalid JSON: {msg.data}")
elif msg.type == aiohttp.WSMsgType.ERROR:
logger.error(f"WebSocket error: {self._websocket.exception()}")
break
elif msg.type == aiohttp.WSMsgType.CLOSED:
logger.info("WebSocket closed")
break
finally:
self._running = False
processor_task.cancel()
await asyncio.gather(processor_task, return_exceptions=True)
async def _process_events(self):
"""Process events from queue."""
while self._running:
try:
event = await asyncio.wait_for(self._event_queue.get(), timeout=1.0)
event_type = event.get("type")
if event_type:
await self._dispatch_event(event_type, event)
except asyncio.TimeoutError:
continue
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error processing event: {e}")
async def _dispatch_event(self, event_type: str, event: dict):
"""Dispatch event to handlers."""
handlers = self._event_handlers.get(event_type, [])
if handlers:
logger.debug(
f"AsyncARIClient: Dispatching {event_type} to {len(handlers)} handlers"
)
for i, handler in enumerate(handlers):
try:
logger.debug(
f" AsyncARIClient: Calling {event_type} handler {i + 1}/{len(handlers)}"
)
await handler(event)
except Exception as e:
logger.error(f"Handler {i + 1} error for {event_type}: {e}")
def on_event(self, event_type: str, handler: Callable):
"""Register event handler."""
if event_type not in self._event_handlers:
self._event_handlers[event_type] = []
logger.debug(
f"AsyncARIClient: Registering handler for {event_type}. Current count: {len(self._event_handlers.get(event_type, []))}"
)
self._event_handlers[event_type].append(handler)
logger.debug(
f"AsyncARIClient: After registration, {event_type} handler count: {len(self._event_handlers[event_type])}"
)
# REST API methods
async def api_get(self, path: str, **params) -> dict:
"""GET request."""
# Ensure path starts with /ari if not already
if not path.startswith("/ari"):
path = f"/ari{path}" if path.startswith("/") else f"/ari/{path}"
url = urljoin(self.api_url, path.lstrip("/"))
async with self._session.get(url, params=params) as resp:
resp.raise_for_status()
data = await resp.json()
# Wrap known objects
if isinstance(data, list):
# Handle lists of channels/bridges
if "/channels" in path:
return [
Channel.from_dict(item, self)
if isinstance(item, dict)
else item
for item in data
]
elif "/bridges" in path:
return [
Bridge.from_dict(item, self) if isinstance(item, dict) else item
for item in data
]
return data
elif isinstance(data, dict):
if "/channels/" in path and "id" in data:
return Channel.from_dict(data, self)
elif "/bridges/" in path and "id" in data:
return Bridge.from_dict(data, self)
return data
async def api_post(self, path: str, json_data: dict = None, **params) -> dict:
"""POST request."""
# Ensure path starts with /ari if not already
if not path.startswith("/ari"):
path = f"/ari{path}" if path.startswith("/") else f"/ari/{path}"
url = urljoin(self.api_url, path.lstrip("/"))
async with self._session.post(url, json=json_data, params=params) as resp:
resp.raise_for_status()
if resp.content_length and resp.content_length > 0:
data = await resp.json()
# Wrap known objects
if "id" in data and "state" in data:
return Channel.from_dict(data, self)
elif "id" in data and "bridge_type" in data:
return Bridge.from_dict(data, self)
return data
return {}
async def api_put(self, path: str, json_data: dict = None, **params) -> dict:
"""PUT request."""
# Ensure path starts with /ari if not already
if not path.startswith("/ari"):
path = f"/ari{path}" if path.startswith("/") else f"/ari/{path}"
url = urljoin(self.api_url, path.lstrip("/"))
async with self._session.put(url, json=json_data, params=params) as resp:
resp.raise_for_status()
if resp.content_length and resp.content_length > 0:
return await resp.json()
return {}
async def api_delete(self, path: str, **params) -> dict:
"""DELETE request."""
# Ensure path starts with /ari if not already
if not path.startswith("/ari"):
path = f"/ari{path}" if path.startswith("/") else f"/ari/{path}"
url = urljoin(self.api_url, path.lstrip("/"))
async with self._session.delete(url, params=params) as resp:
resp.raise_for_status()
if resp.content_length and resp.content_length > 0:
return await resp.json()
return {}