feat: add 3CX telephony provider with Asterisk ARA provisioning

Registers a new `three_cx` provider that fronts a 3CX cloud PBX through
an intermediate Asterisk bridge. Save-time hook writes the matching
PJSIP endpoint/aor/auth/registration and dialplan rows to the Asterisk
Realtime Architecture Postgres (via `ASTERISK_ARA_DSN`), so a config
change in the Dograh UI is immediately picked up by Asterisk without a
`pjsip reload`. Strip prefix is honoured at the dialplan layer.

Inbound calls are matched back to a configuration by the dialled
extension (`account_id_credential_field="extension"`), allowing one
shared Asterisk to serve multiple Dograh orgs without collision.

Touches `providers/__init__.py` and `schemas/telephony_config.py` only
— per `providers/AGENTS.md`. Provider/transport/strategies are
duplicated from `ari/` rather than imported, in line with the
cross-provider-import prohibition. See `docs/providers/three_cx.md` for
the Asterisk ARA setup runbook.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
stefandsl 2026-05-26 13:07:50 +02:00
parent 3df5730076
commit 533a873ab7
13 changed files with 1916 additions and 0 deletions

View file

@ -11,6 +11,7 @@ from api.services.telephony.providers import ( # noqa: F401 -- import for side
cloudonix,
plivo,
telnyx,
three_cx,
twilio,
vobiz,
vonage,

View file

@ -0,0 +1,129 @@
"""3CX (PJSIP trunk via Asterisk bridge) telephony provider package."""
from typing import Any, Dict
from api.services.telephony.registry import (
ProviderSpec,
ProviderUIField,
ProviderUIMetadata,
register,
)
from .config import ThreeCxConfigurationRequest, ThreeCxConfigurationResponse
from .provider import ThreeCxProvider
from .provisioning import _provision_3cx_trunk, endpoint_id_for
from .transport import create_transport
def _config_loader(value: Dict[str, Any]) -> Dict[str, Any]:
"""Reshape stored JSONB credentials into the provider constructor dict."""
return {
"provider": "three_cx",
"ari_endpoint": value.get("ari_endpoint"),
"app_name": value.get("app_name"),
"app_password": value.get("app_password"),
"ws_client_name": value.get("ws_client_name", ""),
"sip_domain": value.get("sip_domain"),
"extension": value.get("extension"),
"strip_prefix": value.get("strip_prefix", ""),
"from_numbers": value.get("from_numbers", []),
}
_UI_METADATA = ProviderUIMetadata(
display_name="3CX (Asterisk bridge)",
docs_url="https://docs.dograh.com/integrations/telephony/three-cx",
fields=[
ProviderUIField(
name="ari_endpoint",
label="ARI Endpoint",
type="text",
description="ARI base URL of the bridging Asterisk (http://host:8088)",
),
ProviderUIField(
name="app_name",
label="Stasis App Name",
type="text",
description="Stasis application name registered in Asterisk",
),
ProviderUIField(
name="app_password",
label="ARI Password",
type="password",
sensitive=True,
),
ProviderUIField(
name="ws_client_name",
label="websocket_client.conf Name",
type="text",
description="websocket_client.conf connection name for externalMedia",
),
ProviderUIField(
name="sip_domain",
label="3CX SIP Domain",
type="text",
description="Your 3CX cloud host (e.g. 1156.3cx.cloud)",
placeholder="1156.3cx.cloud",
),
ProviderUIField(
name="extension",
label="3CX Extension",
type="text",
description="Extension number registered for Dograh (e.g. 12611)",
placeholder="12611",
),
ProviderUIField(
name="sip_password",
label="SIP Password",
type="password",
sensitive=True,
description="SIP auth password for the extension on 3CX",
),
ProviderUIField(
name="strip_prefix",
label="Strip Prefix (regex)",
type="text",
required=False,
description=(
"Optional regex stripped from outbound numbers before dialling. "
"Only the literal '^\\+<digits>' form is supported "
"(Italian deployments use '^\\+39')."
),
placeholder="^\\+39",
),
ProviderUIField(
name="from_numbers",
label="From Numbers",
type="string-array",
required=False,
description="E.164 caller-IDs permitted on outbound calls",
),
],
)
SPEC = ProviderSpec(
name="three_cx",
provider_cls=ThreeCxProvider,
config_loader=_config_loader,
transport_factory=create_transport,
transport_sample_rate=8000,
config_request_cls=ThreeCxConfigurationRequest,
config_response_cls=ThreeCxConfigurationResponse,
ui_metadata=_UI_METADATA,
account_id_credential_field="extension",
preprocess_credentials_on_save=_provision_3cx_trunk,
)
register(SPEC)
__all__ = [
"SPEC",
"ThreeCxConfigurationRequest",
"ThreeCxConfigurationResponse",
"ThreeCxProvider",
"create_transport",
"endpoint_id_for",
]

View file

@ -0,0 +1,69 @@
"""Async connection pool to the Asterisk Realtime Architecture Postgres.
Lives separate from Dograh's primary SQLAlchemy engine because the ARA
Postgres is operationally distinct (Asterisk-owned schema, typically a
different host, different credentials). DSN comes from the
``ASTERISK_ARA_DSN`` environment variable.
"""
from __future__ import annotations
import os
from typing import Optional
import asyncpg
from loguru import logger
_POOL: Optional[asyncpg.Pool] = None
_DSN_ENV = "ASTERISK_ARA_DSN"
class AraNotConfiguredError(RuntimeError):
"""Raised when ASTERISK_ARA_DSN is missing.
The 3CX provider can't provision its trunk without an ARA Postgres to
write to callers translate this into a user-visible HTTP 400 with a
pointer to docs/providers/three_cx.md.
"""
async def get_pool() -> asyncpg.Pool:
"""Return the lazily-initialised ARA pool. Idempotent across awaits."""
global _POOL
if _POOL is not None:
return _POOL
dsn = os.getenv(_DSN_ENV)
if not dsn:
raise AraNotConfiguredError(
f"{_DSN_ENV} not set — 3CX provider needs an Asterisk Realtime "
f"Postgres DSN to provision the PJSIP trunk. See "
f"docs/providers/three_cx.md for setup."
)
logger.info(f"[3CX/ARA] opening asyncpg pool to {_dsn_for_log(dsn)}")
_POOL = await asyncpg.create_pool(
dsn=dsn,
min_size=1,
max_size=4,
command_timeout=10,
)
return _POOL
async def close_pool() -> None:
"""Close the pool — exposed for test teardown and graceful shutdown."""
global _POOL
if _POOL is not None:
await _POOL.close()
_POOL = None
def _dsn_for_log(dsn: str) -> str:
"""Strip the password from a DSN before logging it."""
if "@" not in dsn or "://" not in dsn:
return "<dsn>"
scheme, rest = dsn.split("://", 1)
creds, host = rest.split("@", 1)
user = creds.split(":", 1)[0] if ":" in creds else creds
return f"{scheme}://{user}:***@{host}"

View file

@ -0,0 +1,83 @@
"""3CX (via Asterisk PJSIP trunk) telephony configuration schemas."""
from typing import List, Literal, Optional
from pydantic import BaseModel, Field, field_validator
class ThreeCxConfigurationRequest(BaseModel):
"""Request schema for a 3CX trunk fronted by an Asterisk ARA instance.
The provider owns two distinct credential groups:
* **Asterisk side** (``ari_endpoint``, ``app_name``, ``app_password``,
``ws_client_name``) how Dograh's REST + externalMedia loop talks to
the bridging Asterisk box at call time. Identical in role to the ARI
provider.
* **3CX side** (``sip_domain``, ``extension``, ``sip_password``,
``strip_prefix``) the upstream PBX peer credentials. Dograh never
speaks SIP itself; these are consumed at save time by
``preprocess_credentials_on_save`` to provision the matching PJSIP
endpoint/aor/auth/registration rows on the Asterisk ARA Postgres.
"""
provider: Literal["three_cx"] = Field(default="three_cx")
ari_endpoint: str = Field(
..., description="ARI base URL of the bridging Asterisk (e.g., http://asterisk:8088)"
)
app_name: str = Field(
..., description="Stasis application name registered in Asterisk"
)
app_password: str = Field(..., description="ARI user password")
ws_client_name: str = Field(
default="",
description="websocket_client.conf connection name for externalMedia",
)
sip_domain: str = Field(
..., description="3CX SIP host/domain (e.g., 1156.3cx.cloud)"
)
extension: str = Field(..., description="3CX extension number (e.g., 12611)")
sip_password: str = Field(..., description="SIP auth password for the extension")
strip_prefix: str = Field(
default="",
description=(
"Optional regex stripped from outbound destinations before the call "
"hits the trunk. Italian deployments typically use '^\\+39'."
),
)
from_numbers: List[str] = Field(
default_factory=list,
description="E.164 numbers permitted as caller-id for outbound calls",
)
@field_validator("sip_domain")
@classmethod
def _strip_sip_domain(cls, v: str) -> str:
return (v or "").strip().lower()
@field_validator("extension")
@classmethod
def _strip_extension(cls, v: str) -> str:
return (v or "").strip()
class ThreeCxConfigurationResponse(BaseModel):
"""Response schema for a 3CX configuration.
``app_password`` and ``sip_password`` are masked by the org route layer
before serialization see ``ProviderUIField.sensitive`` in __init__.py.
"""
provider: Literal["three_cx"] = Field(default="three_cx")
ari_endpoint: str
app_name: str
app_password: str # Masked
ws_client_name: str = ""
sip_domain: str
extension: str
sip_password: str # Masked
strip_prefix: str = ""
from_numbers: List[str]

View file

@ -0,0 +1,92 @@
"""Builds Asterisk dialplan rows for a 3CX trunk.
Two contexts are generated per trunk:
* ``<endpoint_id>-outbound`` dialed by the Stasis app when Dograh
originates a call. Honours ``strip_prefix`` by translating the regex
to an Asterisk pattern-match exten and using ``${EXTEN:N}`` to skip
the matched prefix on the way out.
* ``<endpoint_id>-inbound`` the ``context=`` on the PJSIP endpoint.
Routes any incoming call from the trunk straight into the Stasis
app so Dograh's ari_manager picks it up.
We deliberately keep the dialplan minimal anything fancier (IVR,
office-hours routing) belongs in a hand-written context the admin can
include before/after this generated one.
"""
from __future__ import annotations
import re
from typing import List, Tuple
# Asterisk understands its own ad-hoc pattern syntax — not POSIX/PCRE
# regex. We translate the small subset Italian deployments need
# (``^\+39``) and fall back to a verbatim match when the prefix is empty.
_SUPPORTED_PREFIX_RE = re.compile(r"^\^\\?\+(\d+)$")
def _prefix_to_pattern(strip_prefix: str) -> Tuple[str, int]:
"""Translate a small regex into (Asterisk extension pattern, chars-to-skip).
Examples
--------
>>> _prefix_to_pattern("^\\+39")
('_+39N.', 3)
>>> _prefix_to_pattern("")
('_X.', 0)
"""
if not strip_prefix:
return ("_X.", 0)
m = _SUPPORTED_PREFIX_RE.match(strip_prefix)
if not m:
raise ValueError(
f"Unsupported strip_prefix regex {strip_prefix!r}. "
f"Only literal '^\\+<digits>' is supported."
)
digits = m.group(1)
return (f"_+{digits}N.", len(digits) + 1) # +1 for the literal '+'
def build_dialplan_rows(
*,
endpoint_id: str,
extension: str,
stasis_app: str,
strip_prefix: str,
) -> List[dict]:
"""Return ARA ``extensions`` rows for this trunk's inbound + outbound contexts."""
pattern, skip = _prefix_to_pattern(strip_prefix)
dest = f"${{EXTEN:{skip}}}" if skip else "${EXTEN}"
outbound_context = f"{endpoint_id}-outbound"
inbound_context = f"{endpoint_id}-inbound"
return [
{
"context": outbound_context,
"exten": pattern,
"priority": 1,
"app": "Dial",
"appdata": f"PJSIP/{dest}@{endpoint_id},60",
},
{
"context": inbound_context,
"exten": extension,
"priority": 1,
"app": "Stasis",
"appdata": f"{stasis_app},inbound,{endpoint_id}",
},
{
"context": inbound_context,
"exten": "_X.",
"priority": 1,
"app": "Stasis",
"appdata": f"{stasis_app},inbound,{endpoint_id}",
},
]
def outbound_context_for(endpoint_id: str) -> str:
"""The dialplan context name the Stasis app should Originate into."""
return f"{endpoint_id}-outbound"

View file

@ -0,0 +1,439 @@
"""3CX telephony provider — Asterisk PJSIP trunk to a 3CX cloud PBX.
Functionally a specialisation of ARI: the runtime call control flow is
identical (REST originate + Stasis externalMedia), but the provider
carries the 3CX trunk credentials and matches inbound calls back to a
configuration by ``extension``.
We duplicate the ARI provider body rather than subclassing it because
``providers/AGENTS.md`` forbids cross-provider imports. A future
``services/telephony/asterisk_base.py`` extraction should consolidate the
shared logic.
"""
import json
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from urllib.parse import urlparse
import aiohttp
from fastapi import HTTPException
from loguru import logger
from api.db import db_client
from api.services.telephony.base import (
CallInitiationResult,
NormalizedInboundData,
TelephonyProvider,
)
if TYPE_CHECKING:
from fastapi import WebSocket
class ThreeCxProvider(TelephonyProvider):
"""3CX-over-Asterisk implementation of TelephonyProvider."""
PROVIDER_NAME = "three_cx"
WEBHOOK_ENDPOINT = None # 3CX uses WebSocket events via Asterisk, not webhooks
def __init__(self, config: Dict[str, Any]):
"""Initialise from the normalised config dict produced by _config_loader."""
self.ari_endpoint = (config.get("ari_endpoint") or "").rstrip("/")
self.app_name = config.get("app_name", "")
self.app_password = config.get("app_password", "")
self.from_numbers = config.get("from_numbers", [])
# 3CX trunk identity — carried for inbound matching and for the
# provisioning hook to address the right ARA rows. Not used at
# runtime by REST call control (Asterisk owns the SIP leg).
self.sip_domain = (config.get("sip_domain") or "").strip().lower()
self.extension = (config.get("extension") or "").strip()
self.strip_prefix = config.get("strip_prefix", "")
if isinstance(self.from_numbers, str):
self.from_numbers = [self.from_numbers]
self.base_url = f"{self.ari_endpoint}/ari"
def _get_auth(self) -> aiohttp.BasicAuth:
return aiohttp.BasicAuth(self.app_name, self.app_password)
async def initiate_call(
self,
to_number: str,
webhook_url: str,
workflow_run_id: Optional[int] = None,
from_number: Optional[str] = None,
**kwargs: Any,
) -> CallInitiationResult:
"""Originate an outbound call via the bridging Asterisk.
The dialled number is routed through the generated outbound dialplan
context (``<endpoint_id>-outbound``) so the ``strip_prefix`` regex
the admin saved is honoured at the dialplan layer, not in Python.
"""
if not self.validate_config():
raise ValueError("3CX provider not properly configured")
endpoint = f"{self.base_url}/channels"
# Local-channel into the generated outbound context, which contains
# the strip_prefix-aware Dial(PJSIP/...@<endpoint_id>) row.
endpoint_id = self._endpoint_id()
sip_endpoint = f"Local/{to_number}@{endpoint_id}-outbound"
params = {
"endpoint": sip_endpoint,
"app": self.app_name,
"appArgs": ",".join(
filter(
None,
[
f"workflow_run_id={workflow_run_id}",
f"workflow_id={kwargs.get('workflow_id', '')}",
f"user_id={kwargs.get('user_id', '')}",
],
)
),
}
if from_number:
params["callerId"] = from_number
logger.info(
f"[3CX] Initiating call to {to_number} via {sip_endpoint} "
f"(workflow_run_id={workflow_run_id})"
)
async with aiohttp.ClientSession() as session:
async with session.post(
endpoint,
params=params,
auth=self._get_auth(),
) as response:
response_text = await response.text()
if response.status != 200:
logger.error(
f"[3CX] Channel creation failed: "
f"HTTP {response.status} - {response_text}"
)
raise HTTPException(
status_code=response.status,
detail=f"Failed to create 3CX channel: {response_text}",
)
response_data = json.loads(response_text)
channel_id = response_data.get("id", "")
return CallInitiationResult(
call_id=channel_id,
status=response_data.get("state", "created"),
caller_number=from_number,
provider_metadata={
"call_id": channel_id,
"channel_name": response_data.get("name", ""),
},
raw_response=response_data,
)
async def get_call_status(self, call_id: str) -> Dict[str, Any]:
if not self.validate_config():
raise ValueError("3CX provider not properly configured")
url = f"{self.base_url}/channels/{call_id}"
async with aiohttp.ClientSession() as session:
async with session.get(url, auth=self._get_auth()) as response:
if response.status != 200:
raise Exception(
f"Failed to get channel status: {await response.text()}"
)
return await response.json()
async def get_available_phone_numbers(self) -> List[str]:
return self.from_numbers
def validate_config(self) -> bool:
"""Asterisk-side credentials are the only ones required at runtime.
3CX-side credentials (``sip_password`` etc.) are consumed at save time
by the provisioning hook; they're not needed for REST call control.
"""
return bool(self.ari_endpoint and self.app_name and self.app_password)
async def verify_webhook_signature(
self, url: str, params: Dict[str, Any], signature: str
) -> bool:
return True
async def get_webhook_response(
self, workflow_id: int, user_id: int, workflow_run_id: int
) -> str:
logger.warning(
"get_webhook_response called for 3CX — not applicable, "
"control plane is Asterisk REST."
)
return ""
async def get_call_cost(self, call_id: str) -> Dict[str, Any]:
return {
"cost_usd": 0.0,
"duration": 0,
"status": "unknown",
"error": "3CX does not surface call cost to Dograh",
}
def parse_status_callback(self, data: Dict[str, Any]) -> Dict[str, Any]:
state_map = {
"Up": "answered",
"Down": "completed",
"Ringing": "ringing",
"Ring": "ringing",
"Busy": "busy",
"Unavailable": "failed",
}
channel_state = data.get("channel", {}).get("state", "")
event_type = data.get("type", "")
if event_type == "StasisStart":
status = "answered"
elif event_type in ("StasisEnd", "ChannelDestroyed"):
status = "completed"
else:
status = state_map.get(channel_state, channel_state.lower())
channel = data.get("channel", {})
return {
"call_id": channel.get("id", ""),
"status": status,
"from_number": channel.get("caller", {}).get("number"),
"to_number": channel.get("dialplan", {}).get("exten"),
"direction": None,
"duration": None,
"extra": data,
}
async def handle_websocket(
self,
websocket: "WebSocket",
workflow_id: int,
user_id: int,
workflow_run_id: int,
) -> None:
from api.services.pipecat.run_pipeline import run_pipeline_telephony
workflow_run = await db_client.get_workflow_run(workflow_run_id, user_id)
channel_id = ""
if workflow_run and workflow_run.gathered_context:
channel_id = workflow_run.gathered_context.get("call_id", "")
logger.info(
f"[3CX] Starting pipeline for workflow_run {workflow_run_id}, "
f"channel={channel_id}"
)
await run_pipeline_telephony(
websocket,
provider_name=self.PROVIDER_NAME,
workflow_id=workflow_id,
workflow_run_id=workflow_run_id,
user_id=user_id,
call_id=channel_id,
transport_kwargs={"channel_id": channel_id},
)
# ======== INBOUND CALL METHODS ========
@classmethod
def can_handle_webhook(
cls, webhook_data: Dict[str, Any], headers: Dict[str, str]
) -> bool:
"""3CX uses no HTTP webhook layer — inbound arrives via Stasis events."""
return False
@staticmethod
def parse_inbound_webhook(webhook_data: Dict[str, Any]) -> NormalizedInboundData:
"""Parse a Stasis event into normalised inbound data.
``account_id`` is populated with the dialled extension so the
inbound dispatcher can match it against
``credentials['extension']`` and pick the right 3CX configuration
when multiple coexist in one org.
"""
channel = webhook_data.get("channel", {})
caller = channel.get("caller", {})
exten = channel.get("dialplan", {}).get("exten", "")
return NormalizedInboundData(
provider=ThreeCxProvider.PROVIDER_NAME,
call_id=channel.get("id", ""),
from_number=caller.get("number", ""),
to_number=exten,
direction="inbound",
call_status=channel.get("state", ""),
account_id=exten or None,
raw_data=webhook_data,
)
@staticmethod
def validate_account_id(config_data: dict, webhook_account_id: str) -> bool:
"""Match the dialled extension against the saved trunk's extension."""
stored = (config_data or {}).get("extension")
if not stored or not webhook_account_id:
return False
return stored == webhook_account_id
async def verify_inbound_signature(
self,
url: str,
webhook_data: Dict[str, Any],
headers: Dict[str, str],
body: str = "",
) -> bool:
"""3CX authenticates via the Asterisk WebSocket creds; no payload signature."""
return True
async def start_inbound_stream(
self,
*,
websocket_url: str,
workflow_run_id: int,
normalized_data,
backend_endpoint: str,
):
from fastapi import Response
return Response(content="", status_code=204)
@staticmethod
def generate_error_response(error_type: str, message: str) -> tuple:
from fastapi import Response
return Response(
content=json.dumps({"error": error_type, "message": message}),
media_type="application/json",
)
# ======== CALL TRANSFER METHODS ========
def supports_transfers(self) -> bool:
return True
async def transfer_call(
self,
destination: str,
transfer_id: str,
conference_name: str,
timeout: int = 30,
**kwargs: Any,
) -> Dict[str, Any]:
"""Transfer by originating a destination channel and bridge-swapping it in."""
if not self.validate_config():
raise ValueError("3CX provider not properly configured")
from api.services.telephony.call_transfer_manager import (
get_call_transfer_manager,
)
call_transfer_manager = await get_call_transfer_manager()
endpoint_id = self._endpoint_id()
sip_endpoint = f"Local/{destination}@{endpoint_id}-outbound"
app_args = f"transfer,{transfer_id}"
try:
endpoint = f"{self.base_url}/channels"
params = {
"endpoint": sip_endpoint,
"app": self.app_name,
"appArgs": app_args,
"timeout": timeout,
}
async with aiohttp.ClientSession() as session:
async with session.post(
endpoint, params=params, auth=self._get_auth()
) as response:
response_text = await response.text()
if response.status != 200:
await call_transfer_manager.remove_transfer_context(
transfer_id
)
raise Exception(
f"3CX channel creation failed: "
f"{response.status} {response_text}"
)
result = json.loads(response_text)
destination_channel_id = result.get("id", "")
if not destination_channel_id:
await call_transfer_manager.remove_transfer_context(transfer_id)
raise Exception("Failed to create destination channel")
await call_transfer_manager.store_transfer_channel_mapping(
destination_channel_id, transfer_id
)
return {
"call_sid": destination_channel_id,
"status": "initiated",
"provider": self.PROVIDER_NAME,
"raw_response": result,
}
except Exception as e:
logger.error(f"[3CX Transfer] Failed: {e}")
await call_transfer_manager.remove_transfer_context(transfer_id)
raise
# ======== 3CX-SPECIFIC HELPERS ========
def _endpoint_id(self) -> str:
"""Globally unique Asterisk endpoint name for this trunk.
Matches the naming used by the provisioning hook so dialplan + REST
agree on which PJSIP endpoint to address. See provisioning.py.
"""
from .provisioning import endpoint_id_for
return endpoint_id_for(self.sip_domain, self.extension)
async def hangup_channel(self, channel_id: str, reason: str = "normal") -> bool:
endpoint = f"{self.base_url}/channels/{channel_id}"
params = {"reason_code": reason}
try:
async with aiohttp.ClientSession() as session:
async with session.delete(
endpoint, params=params, auth=self._get_auth()
) as response:
if response.status in (200, 204):
return True
logger.error(
f"[3CX] Failed to hangup channel {channel_id}: "
f"{await response.text()}"
)
return False
except Exception as e:
logger.error(f"[3CX] Exception hanging up channel {channel_id}: {e}")
return False
async def answer_channel(self, channel_id: str) -> bool:
endpoint = f"{self.base_url}/channels/{channel_id}/answer"
try:
async with aiohttp.ClientSession() as session:
async with session.post(endpoint, auth=self._get_auth()) as response:
return response.status in (200, 204)
except Exception as e:
logger.error(f"[3CX] Exception answering channel {channel_id}: {e}")
return False
def get_ws_url(self) -> str:
"""ARI WebSocket URL for the standalone event listener (ari_manager)."""
parsed = urlparse(self.ari_endpoint)
ws_scheme = "wss" if parsed.scheme == "https" else "ws"
return (
f"{ws_scheme}://{parsed.netloc}/ari/events"
f"?api_key={self.app_name}:{self.app_password}"
f"&app={self.app_name}"
f"&subscribeAll=true"
)

View file

@ -0,0 +1,262 @@
"""Provision a 3CX PJSIP trunk on the bridging Asterisk via ARA Postgres.
Called by ``ProviderSpec.preprocess_credentials_on_save`` whenever a
TelephonyConfiguration of type ``three_cx`` is created or updated. Writes
the standard six-table PJSIP realtime set:
* ``ps_auths`` userpass auth for outbound REGISTER + inbound 401 challenge
* ``ps_aors`` single contact, qualify keepalive
* ``ps_endpoints`` codec list, dialplan context, auth/aor references
* ``ps_registrations`` outbound REGISTER toward the 3CX cloud SBC
* ``extensions`` (x N) dialplan rows produced by ``dialplan.build_dialplan_rows``
Idempotent on re-save: every row keyed by the deterministic
``endpoint_id_for(sip_domain, extension)`` is deleted first and then
re-inserted in the same transaction. The preprocessor is allowed to do
I/O (registry.py docstring) but must remain re-entrant from the route
layer's point of view.
"""
from __future__ import annotations
import re
from typing import Any, Dict
from fastapi import HTTPException
from loguru import logger
from .ara_db import AraNotConfiguredError, get_pool
from .dialplan import build_dialplan_rows
# Stasis app name as configured in the bridging Asterisk's
# websocket_client.conf. Mirrors the ``app_name`` field on the
# configuration — see runbook §1.
_STASIS_APP_KEY = "app_name"
# Default codecs: G.711a + G.711μ cover 3CX defaults; "ulaw,alaw" is the
# ordered allow list, "all" the disallow base.
_DEFAULT_ALLOW = "ulaw,alaw"
_DEFAULT_DISALLOW = "all"
# Asterisk-side transport name configured by the admin in pjsip.conf
# (e.g. ``transport-udp``). The runbook tells the admin how to set this
# up; the provider just references it by name. Override per-deployment
# via env var on the calling process if necessary.
_TRANSPORT_NAME_DEFAULT = "transport-udp"
# ---------------------------------------------------------------------------
# Public surface
# ---------------------------------------------------------------------------
def endpoint_id_for(sip_domain: str, extension: str) -> str:
"""Deterministic, globally-unique Asterisk endpoint id for this trunk.
Form: ``dograh_<slug(sip_domain)>_<extension>``. Two TelephonyConfigurations
can't legitimately collide because two Asterisks can't simultaneously
register the same (domain, extension) pair upstream anyway.
>>> endpoint_id_for('1156.3cx.cloud', '12611')
'dograh_1156_3cx_cloud_12611'
"""
slug = re.sub(r"[^a-z0-9]+", "_", (sip_domain or "").lower()).strip("_")
ext = re.sub(r"[^A-Za-z0-9]+", "", extension or "")
if not slug or not ext:
raise ValueError(
f"Cannot derive endpoint_id from sip_domain={sip_domain!r} "
f"extension={extension!r}"
)
return f"dograh_{slug}_{ext}"
async def _provision_3cx_trunk(credentials: Dict[str, Any]) -> Dict[str, Any]:
"""Preprocessor hook — writes the ARA rows for this trunk.
Returns the credentials dict unchanged (the provider re-derives
``endpoint_id`` deterministically at runtime, so nothing extra needs
to be persisted).
Raises ``HTTPException`` on validation failure or ARA write failure so
the route layer aborts the DB save matches the Cloudonix pattern.
"""
required = ("sip_domain", "extension", "sip_password", "app_name")
missing = [k for k in required if not credentials.get(k)]
if missing:
raise HTTPException(
status_code=400,
detail=f"3CX provision: missing required credential(s): {missing}",
)
sip_domain = credentials["sip_domain"].strip().lower()
extension = credentials["extension"].strip()
sip_password = credentials["sip_password"]
stasis_app = credentials[_STASIS_APP_KEY]
strip_prefix = credentials.get("strip_prefix", "")
endpoint_id = endpoint_id_for(sip_domain, extension)
transport_name = credentials.get("transport_name", _TRANSPORT_NAME_DEFAULT)
try:
pool = await get_pool()
except AraNotConfiguredError as e:
raise HTTPException(status_code=400, detail=str(e))
dialplan_rows = build_dialplan_rows(
endpoint_id=endpoint_id,
extension=extension,
stasis_app=stasis_app,
strip_prefix=strip_prefix,
)
try:
async with pool.acquire() as conn:
async with conn.transaction():
await _delete_existing(conn, endpoint_id)
await _insert_auth(conn, endpoint_id, extension, sip_password)
await _insert_aor(conn, endpoint_id)
await _insert_endpoint(
conn, endpoint_id, transport_name, sip_domain
)
await _insert_registration(
conn,
endpoint_id=endpoint_id,
transport_name=transport_name,
sip_domain=sip_domain,
extension=extension,
)
await _insert_extensions(conn, dialplan_rows)
except Exception as e:
logger.exception(f"[3CX/ARA] provisioning failed for {endpoint_id}: {e}")
raise HTTPException(
status_code=502,
detail=(
f"3CX provisioning failed while writing to Asterisk ARA: {e}. "
f"No TelephonyConfiguration was saved."
),
)
logger.info(
f"[3CX/ARA] provisioned endpoint={endpoint_id} "
f"(dialplan rows: {len(dialplan_rows)})"
)
return credentials
async def _deprovision_3cx_trunk(credentials: Dict[str, Any]) -> None:
"""Remove all ARA rows for a given trunk.
Not wired into a hook today the registry only exposes the
save-time hook. Exposed as a callable so a future
``post_delete`` extension or admin tooling can use it.
"""
sip_domain = (credentials.get("sip_domain") or "").strip().lower()
extension = (credentials.get("extension") or "").strip()
if not sip_domain or not extension:
return
endpoint_id = endpoint_id_for(sip_domain, extension)
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.transaction():
await _delete_existing(conn, endpoint_id)
logger.info(f"[3CX/ARA] deprovisioned endpoint={endpoint_id}")
# ---------------------------------------------------------------------------
# Per-table writers
# ---------------------------------------------------------------------------
async def _delete_existing(conn, endpoint_id: str) -> None:
"""Strip every ARA row owned by this endpoint_id. Idempotent."""
await conn.execute("DELETE FROM ps_registrations WHERE id = $1", endpoint_id)
await conn.execute("DELETE FROM ps_endpoints WHERE id = $1", endpoint_id)
await conn.execute("DELETE FROM ps_aors WHERE id = $1", endpoint_id)
await conn.execute("DELETE FROM ps_auths WHERE id = $1", endpoint_id)
# Dialplan rows live under two derived contexts.
await conn.execute(
"DELETE FROM extensions WHERE context IN ($1, $2)",
f"{endpoint_id}-inbound",
f"{endpoint_id}-outbound",
)
async def _insert_auth(conn, endpoint_id: str, username: str, password: str) -> None:
await conn.execute(
"""
INSERT INTO ps_auths (id, auth_type, username, password)
VALUES ($1, 'userpass', $2, $3)
""",
endpoint_id,
username,
password,
)
async def _insert_aor(conn, endpoint_id: str) -> None:
await conn.execute(
"""
INSERT INTO ps_aors (id, max_contacts, qualify_frequency)
VALUES ($1, 1, 60)
""",
endpoint_id,
)
async def _insert_endpoint(
conn, endpoint_id: str, transport_name: str, sip_domain: str
) -> None:
await conn.execute(
"""
INSERT INTO ps_endpoints (
id, transport, aors, auth, context,
disallow, allow, from_domain, identify_by
) VALUES ($1, $2, $1, $1, $3, $4, $5, $6, 'auth_username,username')
""",
endpoint_id,
transport_name,
f"{endpoint_id}-inbound",
_DEFAULT_DISALLOW,
_DEFAULT_ALLOW,
sip_domain,
)
async def _insert_registration(
conn,
*,
endpoint_id: str,
transport_name: str,
sip_domain: str,
extension: str,
) -> None:
server_uri = f"sip:{sip_domain}"
client_uri = f"sip:{extension}@{sip_domain}"
await conn.execute(
"""
INSERT INTO ps_registrations (
id, transport, outbound_auth, server_uri, client_uri,
contact_user, expiration, retry_interval
) VALUES ($1, $2, $1, $3, $4, $5, 300, 60)
""",
endpoint_id,
transport_name,
server_uri,
client_uri,
extension,
)
async def _insert_extensions(conn, rows: list[dict]) -> None:
for r in rows:
await conn.execute(
"""
INSERT INTO extensions (context, exten, priority, app, appdata)
VALUES ($1, $2, $3, $4, $5)
""",
r["context"],
r["exten"],
r["priority"],
r["app"],
r["appdata"],
)

View file

@ -0,0 +1,11 @@
"""Asterisk frame serializer (re-exported from pipecat).
3CX runs through an Asterisk bridge, so the wire format and serializer are
identical to the ARI provider. We re-export rather than import from
``..ari`` to keep providers/__init__ from accidentally creating cross-package
coupling see providers/AGENTS.md.
"""
from pipecat.serializers.asterisk import AsteriskFrameSerializer
__all__ = ["AsteriskFrameSerializer"]

View file

@ -0,0 +1,175 @@
"""3CX transfer/hangup strategies.
Functionally identical to ``providers/ari/strategies.py`` 3CX rides on
top of Asterisk so the bridge-swap and channel-delete REST calls are the
same. Duplicated rather than imported because providers/AGENTS.md forbids
cross-provider imports; a future asterisk_base extraction should
consolidate the two.
"""
from typing import Any, Dict
from loguru import logger
from pipecat.serializers.call_strategies import HangupStrategy, TransferStrategy
class ThreeCxBridgeSwapStrategy(TransferStrategy):
"""Bridge-swap transfer over the underlying Asterisk."""
async def execute_transfer(self, context: Dict[str, Any]) -> bool:
try:
import aiohttp
import redis.asyncio as aioredis
from aiohttp import BasicAuth
channel_id = context["channel_id"]
ari_endpoint = context["ari_endpoint"]
app_name = context["app_name"]
app_password = context["app_password"]
if not channel_id or not ari_endpoint:
logger.warning(
"Cannot execute transfer: missing channel_id or ari_endpoint"
)
return False
logger.info(
f"[3CX Transfer] Executing bridge swap for channel {channel_id}"
)
from api.constants import REDIS_URL
from api.db import db_client
from api.services.telephony.call_transfer_manager import (
get_call_transfer_manager,
)
auth = BasicAuth(app_name, app_password)
call_transfer_manager = await get_call_transfer_manager()
transfer_context = (
await call_transfer_manager.find_transfer_context_for_call(channel_id)
)
if not transfer_context:
logger.error(
f"[3CX Transfer] No active transfer context found for caller {channel_id}"
)
return False
redis = aioredis.from_url(REDIS_URL, decode_responses=True)
workflow_run_id = await redis.get(f"ari:channel:{channel_id}")
if not workflow_run_id:
logger.error(
f"[3CX Transfer] No workflow run found for caller {channel_id}"
)
return False
workflow_run = await db_client.get_workflow_run_by_id(int(workflow_run_id))
if not workflow_run or not workflow_run.gathered_context:
logger.error(
f"[3CX Transfer] No workflow context for run {workflow_run_id}"
)
return False
ctx = workflow_run.gathered_context
bridge_id = ctx.get("bridge_id")
ext_channel_id = ctx.get("ext_channel_id")
if not bridge_id or not ext_channel_id:
logger.error(
f"[3CX Transfer] Missing bridge/external channel info: {ctx}"
)
return False
destination_channel_id = transfer_context.call_sid
if not destination_channel_id:
logger.error(
"[3CX Transfer] No destination channel in transfer context"
)
return False
workflow_run.gathered_context["transfer_state"] = "in-progress"
await db_client.update_workflow_run(
run_id=int(workflow_run_id),
gathered_context=workflow_run.gathered_context,
)
async with aiohttp.ClientSession() as session:
add_url = f"{ari_endpoint}/ari/bridges/{bridge_id}/addChannel"
async with session.post(
add_url, auth=auth, params={"channel": destination_channel_id}
) as response:
if response.status not in (200, 204):
error_text = await response.text()
logger.error(
f"[3CX Transfer] Failed to add destination to bridge: "
f"{response.status} {error_text}"
)
return False
remove_url = f"{ari_endpoint}/ari/bridges/{bridge_id}/removeChannel"
async with session.post(
remove_url, auth=auth, params={"channel": ext_channel_id}
) as response:
if response.status not in (200, 204):
error_text = await response.text()
logger.error(
f"[3CX Transfer] Failed to remove external media: "
f"{response.status} {error_text}"
)
hangup_url = f"{ari_endpoint}/ari/channels/{ext_channel_id}"
async with session.delete(hangup_url, auth=auth) as response:
if response.status not in (200, 204, 404):
error_text = await response.text()
logger.warning(
f"[3CX Transfer] Failed to hang up external media: "
f"{response.status} {error_text}"
)
await call_transfer_manager.remove_transfer_context(
transfer_context.transfer_id
)
return True
except Exception as e:
logger.exception(f"Failed to execute 3CX transfer: {e}")
return False
class ThreeCxHangupStrategy(HangupStrategy):
"""Hang up an Asterisk channel that was bridging to the 3CX trunk."""
async def execute_hangup(self, context: Dict[str, Any]) -> bool:
try:
import aiohttp
from aiohttp import BasicAuth
channel_id = context["channel_id"]
ari_endpoint = context["ari_endpoint"]
app_name = context["app_name"]
app_password = context["app_password"]
if not channel_id or not ari_endpoint:
logger.warning(
"Cannot hang up Asterisk channel: missing channel_id or ari_endpoint"
)
return False
endpoint = f"{ari_endpoint}/ari/channels/{channel_id}"
auth = BasicAuth(app_name, app_password)
async with aiohttp.ClientSession() as session:
async with session.delete(endpoint, auth=auth) as response:
if response.status in (200, 204, 404):
return True
error_text = await response.text()
logger.error(
f"Failed to terminate channel {channel_id}: "
f"{response.status} {error_text}"
)
return False
except Exception as e:
logger.exception(f"Failed to hang up Asterisk channel: {e}")
return False

View file

@ -0,0 +1,77 @@
"""3CX transport factory.
3CX rides on top of an Asterisk bridge, so transport is wire-identical to
the ARI provider. The only difference is ``expected_provider="three_cx"``
so ``load_credentials_for_transport`` validates the right config type.
"""
from fastapi import WebSocket
from pipecat.transports.websocket.fastapi import (
FastAPIWebsocketParams,
FastAPIWebsocketTransport,
)
from api.services.pipecat.audio_config import AudioConfig
from api.services.pipecat.audio_mixer import build_audio_out_mixer
from api.services.pipecat.transport_params import realtime_param_overrides
from api.services.telephony.factory import load_credentials_for_transport
from .serializers import AsteriskFrameSerializer
from .strategies import ThreeCxBridgeSwapStrategy, ThreeCxHangupStrategy
async def create_transport(
websocket: WebSocket,
workflow_run_id: int,
audio_config: AudioConfig,
organization_id: int,
*,
ambient_noise_config: dict | None = None,
telephony_configuration_id: int | None = None,
is_realtime: bool = False,
channel_id: str,
):
"""Create a transport for 3CX-via-Asterisk connections."""
config = await load_credentials_for_transport(
organization_id, telephony_configuration_id, expected_provider="three_cx"
)
ari_endpoint = config.get("ari_endpoint")
app_name = config.get("app_name")
app_password = config.get("app_password")
if not ari_endpoint or not app_name or not app_password:
raise ValueError(
f"Incomplete 3CX configuration for organization {organization_id}. "
f"Required: ari_endpoint, app_name, app_password"
)
serializer = AsteriskFrameSerializer(
channel_id=channel_id,
ari_endpoint=ari_endpoint,
app_name=app_name,
app_password=app_password,
transfer_strategy=ThreeCxBridgeSwapStrategy(),
hangup_strategy=ThreeCxHangupStrategy(),
params=AsteriskFrameSerializer.InputParams(
asterisk_sample_rate=audio_config.transport_in_sample_rate,
sample_rate=audio_config.pipeline_sample_rate,
),
)
mixer = await build_audio_out_mixer(
audio_config.transport_out_sample_rate, ambient_noise_config
)
return FastAPIWebsocketTransport(
websocket=websocket,
params=FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
audio_in_sample_rate=audio_config.transport_in_sample_rate,
audio_out_sample_rate=audio_config.transport_out_sample_rate,
audio_out_mixer=mixer,
serializer=serializer,
**realtime_param_overrides(is_realtime),
),
)