From 533a873ab7906dcef16bb00ca52c40439da43707 Mon Sep 17 00:00:00 2001 From: stefandsl Date: Tue, 26 May 2026 13:07:50 +0200 Subject: [PATCH 1/3] feat: add 3CX telephony provider with Asterisk ARA provisioning MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Registers a new `three_cx` provider that fronts a 3CX cloud PBX through an intermediate Asterisk bridge. Save-time hook writes the matching PJSIP endpoint/aor/auth/registration and dialplan rows to the Asterisk Realtime Architecture Postgres (via `ASTERISK_ARA_DSN`), so a config change in the Dograh UI is immediately picked up by Asterisk without a `pjsip reload`. Strip prefix is honoured at the dialplan layer. Inbound calls are matched back to a configuration by the dialled extension (`account_id_credential_field="extension"`), allowing one shared Asterisk to serve multiple Dograh orgs without collision. Touches `providers/__init__.py` and `schemas/telephony_config.py` only — per `providers/AGENTS.md`. Provider/transport/strategies are duplicated from `ari/` rather than imported, in line with the cross-provider-import prohibition. See `docs/providers/three_cx.md` for the Asterisk ARA setup runbook. Co-Authored-By: Claude Opus 4.7 (1M context) --- api/schemas/telephony_config.py | 8 + api/services/telephony/providers/__init__.py | 1 + .../telephony/providers/three_cx/__init__.py | 129 +++++ .../telephony/providers/three_cx/ara_db.py | 69 +++ .../telephony/providers/three_cx/config.py | 83 ++++ .../telephony/providers/three_cx/dialplan.py | 92 ++++ .../telephony/providers/three_cx/provider.py | 439 ++++++++++++++++++ .../providers/three_cx/provisioning.py | 262 +++++++++++ .../providers/three_cx/serializers.py | 11 + .../providers/three_cx/strategies.py | 175 +++++++ .../telephony/providers/three_cx/transport.py | 77 +++ api/tests/telephony/test_three_cx.py | 367 +++++++++++++++ docs/providers/three_cx.md | 203 ++++++++ 13 files changed, 1916 insertions(+) create mode 100644 api/services/telephony/providers/three_cx/__init__.py create mode 100644 api/services/telephony/providers/three_cx/ara_db.py create mode 100644 api/services/telephony/providers/three_cx/config.py create mode 100644 api/services/telephony/providers/three_cx/dialplan.py create mode 100644 api/services/telephony/providers/three_cx/provider.py create mode 100644 api/services/telephony/providers/three_cx/provisioning.py create mode 100644 api/services/telephony/providers/three_cx/serializers.py create mode 100644 api/services/telephony/providers/three_cx/strategies.py create mode 100644 api/services/telephony/providers/three_cx/transport.py create mode 100644 api/tests/telephony/test_three_cx.py create mode 100644 docs/providers/three_cx.md diff --git a/api/schemas/telephony_config.py b/api/schemas/telephony_config.py index b056400f..8bc9eae6 100644 --- a/api/schemas/telephony_config.py +++ b/api/schemas/telephony_config.py @@ -28,6 +28,10 @@ from api.services.telephony.providers.telnyx.config import ( TelnyxConfigurationRequest, TelnyxConfigurationResponse, ) +from api.services.telephony.providers.three_cx.config import ( + ThreeCxConfigurationRequest, + ThreeCxConfigurationResponse, +) from api.services.telephony.providers.twilio.config import ( TwilioConfigurationRequest, TwilioConfigurationResponse, @@ -50,6 +54,7 @@ TelephonyConfigRequest = Annotated[ CloudonixConfigurationRequest, PlivoConfigurationRequest, TelnyxConfigurationRequest, + ThreeCxConfigurationRequest, TwilioConfigurationRequest, VobizConfigurationRequest, VonageConfigurationRequest, @@ -73,6 +78,7 @@ class TelephonyConfigurationResponse(BaseModel): cloudonix: Optional[CloudonixConfigurationResponse] = None ari: Optional[ARIConfigurationResponse] = None telnyx: Optional[TelnyxConfigurationResponse] = None + three_cx: Optional[ThreeCxConfigurationResponse] = None # --------------------------------------------------------------------------- @@ -142,6 +148,8 @@ __all__ = [ "TelephonyConfigurationResponse", "TelnyxConfigurationRequest", "TelnyxConfigurationResponse", + "ThreeCxConfigurationRequest", + "ThreeCxConfigurationResponse", "TwilioConfigurationRequest", "TwilioConfigurationResponse", "VobizConfigurationRequest", diff --git a/api/services/telephony/providers/__init__.py b/api/services/telephony/providers/__init__.py index 4df4e7f0..d1b8ee6d 100644 --- a/api/services/telephony/providers/__init__.py +++ b/api/services/telephony/providers/__init__.py @@ -11,6 +11,7 @@ from api.services.telephony.providers import ( # noqa: F401 -- import for side cloudonix, plivo, telnyx, + three_cx, twilio, vobiz, vonage, diff --git a/api/services/telephony/providers/three_cx/__init__.py b/api/services/telephony/providers/three_cx/__init__.py new file mode 100644 index 00000000..6f000970 --- /dev/null +++ b/api/services/telephony/providers/three_cx/__init__.py @@ -0,0 +1,129 @@ +"""3CX (PJSIP trunk via Asterisk bridge) telephony provider package.""" + +from typing import Any, Dict + +from api.services.telephony.registry import ( + ProviderSpec, + ProviderUIField, + ProviderUIMetadata, + register, +) + +from .config import ThreeCxConfigurationRequest, ThreeCxConfigurationResponse +from .provider import ThreeCxProvider +from .provisioning import _provision_3cx_trunk, endpoint_id_for +from .transport import create_transport + + +def _config_loader(value: Dict[str, Any]) -> Dict[str, Any]: + """Reshape stored JSONB credentials into the provider constructor dict.""" + return { + "provider": "three_cx", + "ari_endpoint": value.get("ari_endpoint"), + "app_name": value.get("app_name"), + "app_password": value.get("app_password"), + "ws_client_name": value.get("ws_client_name", ""), + "sip_domain": value.get("sip_domain"), + "extension": value.get("extension"), + "strip_prefix": value.get("strip_prefix", ""), + "from_numbers": value.get("from_numbers", []), + } + + +_UI_METADATA = ProviderUIMetadata( + display_name="3CX (Asterisk bridge)", + docs_url="https://docs.dograh.com/integrations/telephony/three-cx", + fields=[ + ProviderUIField( + name="ari_endpoint", + label="ARI Endpoint", + type="text", + description="ARI base URL of the bridging Asterisk (http://host:8088)", + ), + ProviderUIField( + name="app_name", + label="Stasis App Name", + type="text", + description="Stasis application name registered in Asterisk", + ), + ProviderUIField( + name="app_password", + label="ARI Password", + type="password", + sensitive=True, + ), + ProviderUIField( + name="ws_client_name", + label="websocket_client.conf Name", + type="text", + description="websocket_client.conf connection name for externalMedia", + ), + ProviderUIField( + name="sip_domain", + label="3CX SIP Domain", + type="text", + description="Your 3CX cloud host (e.g. 1156.3cx.cloud)", + placeholder="1156.3cx.cloud", + ), + ProviderUIField( + name="extension", + label="3CX Extension", + type="text", + description="Extension number registered for Dograh (e.g. 12611)", + placeholder="12611", + ), + ProviderUIField( + name="sip_password", + label="SIP Password", + type="password", + sensitive=True, + description="SIP auth password for the extension on 3CX", + ), + ProviderUIField( + name="strip_prefix", + label="Strip Prefix (regex)", + type="text", + required=False, + description=( + "Optional regex stripped from outbound numbers before dialling. " + "Only the literal '^\\+' form is supported " + "(Italian deployments use '^\\+39')." + ), + placeholder="^\\+39", + ), + ProviderUIField( + name="from_numbers", + label="From Numbers", + type="string-array", + required=False, + description="E.164 caller-IDs permitted on outbound calls", + ), + ], +) + + +SPEC = ProviderSpec( + name="three_cx", + provider_cls=ThreeCxProvider, + config_loader=_config_loader, + transport_factory=create_transport, + transport_sample_rate=8000, + config_request_cls=ThreeCxConfigurationRequest, + config_response_cls=ThreeCxConfigurationResponse, + ui_metadata=_UI_METADATA, + account_id_credential_field="extension", + preprocess_credentials_on_save=_provision_3cx_trunk, +) + + +register(SPEC) + + +__all__ = [ + "SPEC", + "ThreeCxConfigurationRequest", + "ThreeCxConfigurationResponse", + "ThreeCxProvider", + "create_transport", + "endpoint_id_for", +] diff --git a/api/services/telephony/providers/three_cx/ara_db.py b/api/services/telephony/providers/three_cx/ara_db.py new file mode 100644 index 00000000..89f2a488 --- /dev/null +++ b/api/services/telephony/providers/three_cx/ara_db.py @@ -0,0 +1,69 @@ +"""Async connection pool to the Asterisk Realtime Architecture Postgres. + +Lives separate from Dograh's primary SQLAlchemy engine because the ARA +Postgres is operationally distinct (Asterisk-owned schema, typically a +different host, different credentials). DSN comes from the +``ASTERISK_ARA_DSN`` environment variable. +""" + +from __future__ import annotations + +import os +from typing import Optional + +import asyncpg +from loguru import logger + +_POOL: Optional[asyncpg.Pool] = None +_DSN_ENV = "ASTERISK_ARA_DSN" + + +class AraNotConfiguredError(RuntimeError): + """Raised when ASTERISK_ARA_DSN is missing. + + The 3CX provider can't provision its trunk without an ARA Postgres to + write to — callers translate this into a user-visible HTTP 400 with a + pointer to docs/providers/three_cx.md. + """ + + +async def get_pool() -> asyncpg.Pool: + """Return the lazily-initialised ARA pool. Idempotent across awaits.""" + global _POOL + if _POOL is not None: + return _POOL + + dsn = os.getenv(_DSN_ENV) + if not dsn: + raise AraNotConfiguredError( + f"{_DSN_ENV} not set — 3CX provider needs an Asterisk Realtime " + f"Postgres DSN to provision the PJSIP trunk. See " + f"docs/providers/three_cx.md for setup." + ) + + logger.info(f"[3CX/ARA] opening asyncpg pool to {_dsn_for_log(dsn)}") + _POOL = await asyncpg.create_pool( + dsn=dsn, + min_size=1, + max_size=4, + command_timeout=10, + ) + return _POOL + + +async def close_pool() -> None: + """Close the pool — exposed for test teardown and graceful shutdown.""" + global _POOL + if _POOL is not None: + await _POOL.close() + _POOL = None + + +def _dsn_for_log(dsn: str) -> str: + """Strip the password from a DSN before logging it.""" + if "@" not in dsn or "://" not in dsn: + return "" + scheme, rest = dsn.split("://", 1) + creds, host = rest.split("@", 1) + user = creds.split(":", 1)[0] if ":" in creds else creds + return f"{scheme}://{user}:***@{host}" diff --git a/api/services/telephony/providers/three_cx/config.py b/api/services/telephony/providers/three_cx/config.py new file mode 100644 index 00000000..633dd632 --- /dev/null +++ b/api/services/telephony/providers/three_cx/config.py @@ -0,0 +1,83 @@ +"""3CX (via Asterisk PJSIP trunk) telephony configuration schemas.""" + +from typing import List, Literal, Optional + +from pydantic import BaseModel, Field, field_validator + + +class ThreeCxConfigurationRequest(BaseModel): + """Request schema for a 3CX trunk fronted by an Asterisk ARA instance. + + The provider owns two distinct credential groups: + + * **Asterisk side** (``ari_endpoint``, ``app_name``, ``app_password``, + ``ws_client_name``) — how Dograh's REST + externalMedia loop talks to + the bridging Asterisk box at call time. Identical in role to the ARI + provider. + * **3CX side** (``sip_domain``, ``extension``, ``sip_password``, + ``strip_prefix``) — the upstream PBX peer credentials. Dograh never + speaks SIP itself; these are consumed at save time by + ``preprocess_credentials_on_save`` to provision the matching PJSIP + endpoint/aor/auth/registration rows on the Asterisk ARA Postgres. + """ + + provider: Literal["three_cx"] = Field(default="three_cx") + + ari_endpoint: str = Field( + ..., description="ARI base URL of the bridging Asterisk (e.g., http://asterisk:8088)" + ) + app_name: str = Field( + ..., description="Stasis application name registered in Asterisk" + ) + app_password: str = Field(..., description="ARI user password") + ws_client_name: str = Field( + default="", + description="websocket_client.conf connection name for externalMedia", + ) + + sip_domain: str = Field( + ..., description="3CX SIP host/domain (e.g., 1156.3cx.cloud)" + ) + extension: str = Field(..., description="3CX extension number (e.g., 12611)") + sip_password: str = Field(..., description="SIP auth password for the extension") + strip_prefix: str = Field( + default="", + description=( + "Optional regex stripped from outbound destinations before the call " + "hits the trunk. Italian deployments typically use '^\\+39'." + ), + ) + + from_numbers: List[str] = Field( + default_factory=list, + description="E.164 numbers permitted as caller-id for outbound calls", + ) + + @field_validator("sip_domain") + @classmethod + def _strip_sip_domain(cls, v: str) -> str: + return (v or "").strip().lower() + + @field_validator("extension") + @classmethod + def _strip_extension(cls, v: str) -> str: + return (v or "").strip() + + +class ThreeCxConfigurationResponse(BaseModel): + """Response schema for a 3CX configuration. + + ``app_password`` and ``sip_password`` are masked by the org route layer + before serialization — see ``ProviderUIField.sensitive`` in __init__.py. + """ + + provider: Literal["three_cx"] = Field(default="three_cx") + ari_endpoint: str + app_name: str + app_password: str # Masked + ws_client_name: str = "" + sip_domain: str + extension: str + sip_password: str # Masked + strip_prefix: str = "" + from_numbers: List[str] diff --git a/api/services/telephony/providers/three_cx/dialplan.py b/api/services/telephony/providers/three_cx/dialplan.py new file mode 100644 index 00000000..06313a86 --- /dev/null +++ b/api/services/telephony/providers/three_cx/dialplan.py @@ -0,0 +1,92 @@ +"""Builds Asterisk dialplan rows for a 3CX trunk. + +Two contexts are generated per trunk: + +* ``-outbound`` — dialed by the Stasis app when Dograh + originates a call. Honours ``strip_prefix`` by translating the regex + to an Asterisk pattern-match exten and using ``${EXTEN:N}`` to skip + the matched prefix on the way out. +* ``-inbound`` — the ``context=`` on the PJSIP endpoint. + Routes any incoming call from the trunk straight into the Stasis + app so Dograh's ari_manager picks it up. + +We deliberately keep the dialplan minimal — anything fancier (IVR, +office-hours routing) belongs in a hand-written context the admin can +include before/after this generated one. +""" + +from __future__ import annotations + +import re +from typing import List, Tuple + +# Asterisk understands its own ad-hoc pattern syntax — not POSIX/PCRE +# regex. We translate the small subset Italian deployments need +# (``^\+39``) and fall back to a verbatim match when the prefix is empty. +_SUPPORTED_PREFIX_RE = re.compile(r"^\^\\?\+(\d+)$") + + +def _prefix_to_pattern(strip_prefix: str) -> Tuple[str, int]: + """Translate a small regex into (Asterisk extension pattern, chars-to-skip). + + Examples + -------- + >>> _prefix_to_pattern("^\\+39") + ('_+39N.', 3) + >>> _prefix_to_pattern("") + ('_X.', 0) + """ + if not strip_prefix: + return ("_X.", 0) + m = _SUPPORTED_PREFIX_RE.match(strip_prefix) + if not m: + raise ValueError( + f"Unsupported strip_prefix regex {strip_prefix!r}. " + f"Only literal '^\\+' is supported." + ) + digits = m.group(1) + return (f"_+{digits}N.", len(digits) + 1) # +1 for the literal '+' + + +def build_dialplan_rows( + *, + endpoint_id: str, + extension: str, + stasis_app: str, + strip_prefix: str, +) -> List[dict]: + """Return ARA ``extensions`` rows for this trunk's inbound + outbound contexts.""" + pattern, skip = _prefix_to_pattern(strip_prefix) + dest = f"${{EXTEN:{skip}}}" if skip else "${EXTEN}" + + outbound_context = f"{endpoint_id}-outbound" + inbound_context = f"{endpoint_id}-inbound" + + return [ + { + "context": outbound_context, + "exten": pattern, + "priority": 1, + "app": "Dial", + "appdata": f"PJSIP/{dest}@{endpoint_id},60", + }, + { + "context": inbound_context, + "exten": extension, + "priority": 1, + "app": "Stasis", + "appdata": f"{stasis_app},inbound,{endpoint_id}", + }, + { + "context": inbound_context, + "exten": "_X.", + "priority": 1, + "app": "Stasis", + "appdata": f"{stasis_app},inbound,{endpoint_id}", + }, + ] + + +def outbound_context_for(endpoint_id: str) -> str: + """The dialplan context name the Stasis app should Originate into.""" + return f"{endpoint_id}-outbound" diff --git a/api/services/telephony/providers/three_cx/provider.py b/api/services/telephony/providers/three_cx/provider.py new file mode 100644 index 00000000..03cdc049 --- /dev/null +++ b/api/services/telephony/providers/three_cx/provider.py @@ -0,0 +1,439 @@ +"""3CX telephony provider — Asterisk PJSIP trunk to a 3CX cloud PBX. + +Functionally a specialisation of ARI: the runtime call control flow is +identical (REST originate + Stasis externalMedia), but the provider +carries the 3CX trunk credentials and matches inbound calls back to a +configuration by ``extension``. + +We duplicate the ARI provider body rather than subclassing it because +``providers/AGENTS.md`` forbids cross-provider imports. A future +``services/telephony/asterisk_base.py`` extraction should consolidate the +shared logic. +""" + +import json +from typing import TYPE_CHECKING, Any, Dict, List, Optional +from urllib.parse import urlparse + +import aiohttp +from fastapi import HTTPException +from loguru import logger + +from api.db import db_client +from api.services.telephony.base import ( + CallInitiationResult, + NormalizedInboundData, + TelephonyProvider, +) + +if TYPE_CHECKING: + from fastapi import WebSocket + + +class ThreeCxProvider(TelephonyProvider): + """3CX-over-Asterisk implementation of TelephonyProvider.""" + + PROVIDER_NAME = "three_cx" + WEBHOOK_ENDPOINT = None # 3CX uses WebSocket events via Asterisk, not webhooks + + def __init__(self, config: Dict[str, Any]): + """Initialise from the normalised config dict produced by _config_loader.""" + self.ari_endpoint = (config.get("ari_endpoint") or "").rstrip("/") + self.app_name = config.get("app_name", "") + self.app_password = config.get("app_password", "") + self.from_numbers = config.get("from_numbers", []) + + # 3CX trunk identity — carried for inbound matching and for the + # provisioning hook to address the right ARA rows. Not used at + # runtime by REST call control (Asterisk owns the SIP leg). + self.sip_domain = (config.get("sip_domain") or "").strip().lower() + self.extension = (config.get("extension") or "").strip() + self.strip_prefix = config.get("strip_prefix", "") + + if isinstance(self.from_numbers, str): + self.from_numbers = [self.from_numbers] + + self.base_url = f"{self.ari_endpoint}/ari" + + def _get_auth(self) -> aiohttp.BasicAuth: + return aiohttp.BasicAuth(self.app_name, self.app_password) + + async def initiate_call( + self, + to_number: str, + webhook_url: str, + workflow_run_id: Optional[int] = None, + from_number: Optional[str] = None, + **kwargs: Any, + ) -> CallInitiationResult: + """Originate an outbound call via the bridging Asterisk. + + The dialled number is routed through the generated outbound dialplan + context (``-outbound``) so the ``strip_prefix`` regex + the admin saved is honoured at the dialplan layer, not in Python. + """ + if not self.validate_config(): + raise ValueError("3CX provider not properly configured") + + endpoint = f"{self.base_url}/channels" + + # Local-channel into the generated outbound context, which contains + # the strip_prefix-aware Dial(PJSIP/...@) row. + endpoint_id = self._endpoint_id() + sip_endpoint = f"Local/{to_number}@{endpoint_id}-outbound" + + params = { + "endpoint": sip_endpoint, + "app": self.app_name, + "appArgs": ",".join( + filter( + None, + [ + f"workflow_run_id={workflow_run_id}", + f"workflow_id={kwargs.get('workflow_id', '')}", + f"user_id={kwargs.get('user_id', '')}", + ], + ) + ), + } + + if from_number: + params["callerId"] = from_number + + logger.info( + f"[3CX] Initiating call to {to_number} via {sip_endpoint} " + f"(workflow_run_id={workflow_run_id})" + ) + + async with aiohttp.ClientSession() as session: + async with session.post( + endpoint, + params=params, + auth=self._get_auth(), + ) as response: + response_text = await response.text() + + if response.status != 200: + logger.error( + f"[3CX] Channel creation failed: " + f"HTTP {response.status} - {response_text}" + ) + raise HTTPException( + status_code=response.status, + detail=f"Failed to create 3CX channel: {response_text}", + ) + + response_data = json.loads(response_text) + channel_id = response_data.get("id", "") + + return CallInitiationResult( + call_id=channel_id, + status=response_data.get("state", "created"), + caller_number=from_number, + provider_metadata={ + "call_id": channel_id, + "channel_name": response_data.get("name", ""), + }, + raw_response=response_data, + ) + + async def get_call_status(self, call_id: str) -> Dict[str, Any]: + if not self.validate_config(): + raise ValueError("3CX provider not properly configured") + url = f"{self.base_url}/channels/{call_id}" + async with aiohttp.ClientSession() as session: + async with session.get(url, auth=self._get_auth()) as response: + if response.status != 200: + raise Exception( + f"Failed to get channel status: {await response.text()}" + ) + return await response.json() + + async def get_available_phone_numbers(self) -> List[str]: + return self.from_numbers + + def validate_config(self) -> bool: + """Asterisk-side credentials are the only ones required at runtime. + + 3CX-side credentials (``sip_password`` etc.) are consumed at save time + by the provisioning hook; they're not needed for REST call control. + """ + return bool(self.ari_endpoint and self.app_name and self.app_password) + + async def verify_webhook_signature( + self, url: str, params: Dict[str, Any], signature: str + ) -> bool: + return True + + async def get_webhook_response( + self, workflow_id: int, user_id: int, workflow_run_id: int + ) -> str: + logger.warning( + "get_webhook_response called for 3CX — not applicable, " + "control plane is Asterisk REST." + ) + return "" + + async def get_call_cost(self, call_id: str) -> Dict[str, Any]: + return { + "cost_usd": 0.0, + "duration": 0, + "status": "unknown", + "error": "3CX does not surface call cost to Dograh", + } + + def parse_status_callback(self, data: Dict[str, Any]) -> Dict[str, Any]: + state_map = { + "Up": "answered", + "Down": "completed", + "Ringing": "ringing", + "Ring": "ringing", + "Busy": "busy", + "Unavailable": "failed", + } + channel_state = data.get("channel", {}).get("state", "") + event_type = data.get("type", "") + + if event_type == "StasisStart": + status = "answered" + elif event_type in ("StasisEnd", "ChannelDestroyed"): + status = "completed" + else: + status = state_map.get(channel_state, channel_state.lower()) + + channel = data.get("channel", {}) + return { + "call_id": channel.get("id", ""), + "status": status, + "from_number": channel.get("caller", {}).get("number"), + "to_number": channel.get("dialplan", {}).get("exten"), + "direction": None, + "duration": None, + "extra": data, + } + + async def handle_websocket( + self, + websocket: "WebSocket", + workflow_id: int, + user_id: int, + workflow_run_id: int, + ) -> None: + from api.services.pipecat.run_pipeline import run_pipeline_telephony + + workflow_run = await db_client.get_workflow_run(workflow_run_id, user_id) + channel_id = "" + if workflow_run and workflow_run.gathered_context: + channel_id = workflow_run.gathered_context.get("call_id", "") + + logger.info( + f"[3CX] Starting pipeline for workflow_run {workflow_run_id}, " + f"channel={channel_id}" + ) + + await run_pipeline_telephony( + websocket, + provider_name=self.PROVIDER_NAME, + workflow_id=workflow_id, + workflow_run_id=workflow_run_id, + user_id=user_id, + call_id=channel_id, + transport_kwargs={"channel_id": channel_id}, + ) + + # ======== INBOUND CALL METHODS ======== + + @classmethod + def can_handle_webhook( + cls, webhook_data: Dict[str, Any], headers: Dict[str, str] + ) -> bool: + """3CX uses no HTTP webhook layer — inbound arrives via Stasis events.""" + return False + + @staticmethod + def parse_inbound_webhook(webhook_data: Dict[str, Any]) -> NormalizedInboundData: + """Parse a Stasis event into normalised inbound data. + + ``account_id`` is populated with the dialled extension so the + inbound dispatcher can match it against + ``credentials['extension']`` and pick the right 3CX configuration + when multiple coexist in one org. + """ + channel = webhook_data.get("channel", {}) + caller = channel.get("caller", {}) + exten = channel.get("dialplan", {}).get("exten", "") + + return NormalizedInboundData( + provider=ThreeCxProvider.PROVIDER_NAME, + call_id=channel.get("id", ""), + from_number=caller.get("number", ""), + to_number=exten, + direction="inbound", + call_status=channel.get("state", ""), + account_id=exten or None, + raw_data=webhook_data, + ) + + @staticmethod + def validate_account_id(config_data: dict, webhook_account_id: str) -> bool: + """Match the dialled extension against the saved trunk's extension.""" + stored = (config_data or {}).get("extension") + if not stored or not webhook_account_id: + return False + return stored == webhook_account_id + + async def verify_inbound_signature( + self, + url: str, + webhook_data: Dict[str, Any], + headers: Dict[str, str], + body: str = "", + ) -> bool: + """3CX authenticates via the Asterisk WebSocket creds; no payload signature.""" + return True + + async def start_inbound_stream( + self, + *, + websocket_url: str, + workflow_run_id: int, + normalized_data, + backend_endpoint: str, + ): + from fastapi import Response + + return Response(content="", status_code=204) + + @staticmethod + def generate_error_response(error_type: str, message: str) -> tuple: + from fastapi import Response + + return Response( + content=json.dumps({"error": error_type, "message": message}), + media_type="application/json", + ) + + # ======== CALL TRANSFER METHODS ======== + + def supports_transfers(self) -> bool: + return True + + async def transfer_call( + self, + destination: str, + transfer_id: str, + conference_name: str, + timeout: int = 30, + **kwargs: Any, + ) -> Dict[str, Any]: + """Transfer by originating a destination channel and bridge-swapping it in.""" + if not self.validate_config(): + raise ValueError("3CX provider not properly configured") + + from api.services.telephony.call_transfer_manager import ( + get_call_transfer_manager, + ) + + call_transfer_manager = await get_call_transfer_manager() + + endpoint_id = self._endpoint_id() + sip_endpoint = f"Local/{destination}@{endpoint_id}-outbound" + + app_args = f"transfer,{transfer_id}" + + try: + endpoint = f"{self.base_url}/channels" + params = { + "endpoint": sip_endpoint, + "app": self.app_name, + "appArgs": app_args, + "timeout": timeout, + } + + async with aiohttp.ClientSession() as session: + async with session.post( + endpoint, params=params, auth=self._get_auth() + ) as response: + response_text = await response.text() + if response.status != 200: + await call_transfer_manager.remove_transfer_context( + transfer_id + ) + raise Exception( + f"3CX channel creation failed: " + f"{response.status} {response_text}" + ) + result = json.loads(response_text) + + destination_channel_id = result.get("id", "") + if not destination_channel_id: + await call_transfer_manager.remove_transfer_context(transfer_id) + raise Exception("Failed to create destination channel") + + await call_transfer_manager.store_transfer_channel_mapping( + destination_channel_id, transfer_id + ) + + return { + "call_sid": destination_channel_id, + "status": "initiated", + "provider": self.PROVIDER_NAME, + "raw_response": result, + } + + except Exception as e: + logger.error(f"[3CX Transfer] Failed: {e}") + await call_transfer_manager.remove_transfer_context(transfer_id) + raise + + # ======== 3CX-SPECIFIC HELPERS ======== + + def _endpoint_id(self) -> str: + """Globally unique Asterisk endpoint name for this trunk. + + Matches the naming used by the provisioning hook so dialplan + REST + agree on which PJSIP endpoint to address. See provisioning.py. + """ + from .provisioning import endpoint_id_for + + return endpoint_id_for(self.sip_domain, self.extension) + + async def hangup_channel(self, channel_id: str, reason: str = "normal") -> bool: + endpoint = f"{self.base_url}/channels/{channel_id}" + params = {"reason_code": reason} + try: + async with aiohttp.ClientSession() as session: + async with session.delete( + endpoint, params=params, auth=self._get_auth() + ) as response: + if response.status in (200, 204): + return True + logger.error( + f"[3CX] Failed to hangup channel {channel_id}: " + f"{await response.text()}" + ) + return False + except Exception as e: + logger.error(f"[3CX] Exception hanging up channel {channel_id}: {e}") + return False + + async def answer_channel(self, channel_id: str) -> bool: + endpoint = f"{self.base_url}/channels/{channel_id}/answer" + try: + async with aiohttp.ClientSession() as session: + async with session.post(endpoint, auth=self._get_auth()) as response: + return response.status in (200, 204) + except Exception as e: + logger.error(f"[3CX] Exception answering channel {channel_id}: {e}") + return False + + def get_ws_url(self) -> str: + """ARI WebSocket URL for the standalone event listener (ari_manager).""" + parsed = urlparse(self.ari_endpoint) + ws_scheme = "wss" if parsed.scheme == "https" else "ws" + return ( + f"{ws_scheme}://{parsed.netloc}/ari/events" + f"?api_key={self.app_name}:{self.app_password}" + f"&app={self.app_name}" + f"&subscribeAll=true" + ) diff --git a/api/services/telephony/providers/three_cx/provisioning.py b/api/services/telephony/providers/three_cx/provisioning.py new file mode 100644 index 00000000..cd0c9c95 --- /dev/null +++ b/api/services/telephony/providers/three_cx/provisioning.py @@ -0,0 +1,262 @@ +"""Provision a 3CX PJSIP trunk on the bridging Asterisk via ARA Postgres. + +Called by ``ProviderSpec.preprocess_credentials_on_save`` whenever a +TelephonyConfiguration of type ``three_cx`` is created or updated. Writes +the standard six-table PJSIP realtime set: + +* ``ps_auths`` — userpass auth for outbound REGISTER + inbound 401 challenge +* ``ps_aors`` — single contact, qualify keepalive +* ``ps_endpoints`` — codec list, dialplan context, auth/aor references +* ``ps_registrations`` — outbound REGISTER toward the 3CX cloud SBC +* ``extensions`` (x N) — dialplan rows produced by ``dialplan.build_dialplan_rows`` + +Idempotent on re-save: every row keyed by the deterministic +``endpoint_id_for(sip_domain, extension)`` is deleted first and then +re-inserted in the same transaction. The preprocessor is allowed to do +I/O (registry.py docstring) but must remain re-entrant from the route +layer's point of view. +""" + +from __future__ import annotations + +import re +from typing import Any, Dict + +from fastapi import HTTPException +from loguru import logger + +from .ara_db import AraNotConfiguredError, get_pool +from .dialplan import build_dialplan_rows + +# Stasis app name as configured in the bridging Asterisk's +# websocket_client.conf. Mirrors the ``app_name`` field on the +# configuration — see runbook §1. +_STASIS_APP_KEY = "app_name" + +# Default codecs: G.711a + G.711μ cover 3CX defaults; "ulaw,alaw" is the +# ordered allow list, "all" the disallow base. +_DEFAULT_ALLOW = "ulaw,alaw" +_DEFAULT_DISALLOW = "all" + +# Asterisk-side transport name configured by the admin in pjsip.conf +# (e.g. ``transport-udp``). The runbook tells the admin how to set this +# up; the provider just references it by name. Override per-deployment +# via env var on the calling process if necessary. +_TRANSPORT_NAME_DEFAULT = "transport-udp" + + +# --------------------------------------------------------------------------- +# Public surface +# --------------------------------------------------------------------------- + + +def endpoint_id_for(sip_domain: str, extension: str) -> str: + """Deterministic, globally-unique Asterisk endpoint id for this trunk. + + Form: ``dograh__``. Two TelephonyConfigurations + can't legitimately collide because two Asterisks can't simultaneously + register the same (domain, extension) pair upstream anyway. + + >>> endpoint_id_for('1156.3cx.cloud', '12611') + 'dograh_1156_3cx_cloud_12611' + """ + slug = re.sub(r"[^a-z0-9]+", "_", (sip_domain or "").lower()).strip("_") + ext = re.sub(r"[^A-Za-z0-9]+", "", extension or "") + if not slug or not ext: + raise ValueError( + f"Cannot derive endpoint_id from sip_domain={sip_domain!r} " + f"extension={extension!r}" + ) + return f"dograh_{slug}_{ext}" + + +async def _provision_3cx_trunk(credentials: Dict[str, Any]) -> Dict[str, Any]: + """Preprocessor hook — writes the ARA rows for this trunk. + + Returns the credentials dict unchanged (the provider re-derives + ``endpoint_id`` deterministically at runtime, so nothing extra needs + to be persisted). + + Raises ``HTTPException`` on validation failure or ARA write failure so + the route layer aborts the DB save — matches the Cloudonix pattern. + """ + required = ("sip_domain", "extension", "sip_password", "app_name") + missing = [k for k in required if not credentials.get(k)] + if missing: + raise HTTPException( + status_code=400, + detail=f"3CX provision: missing required credential(s): {missing}", + ) + + sip_domain = credentials["sip_domain"].strip().lower() + extension = credentials["extension"].strip() + sip_password = credentials["sip_password"] + stasis_app = credentials[_STASIS_APP_KEY] + strip_prefix = credentials.get("strip_prefix", "") + + endpoint_id = endpoint_id_for(sip_domain, extension) + transport_name = credentials.get("transport_name", _TRANSPORT_NAME_DEFAULT) + + try: + pool = await get_pool() + except AraNotConfiguredError as e: + raise HTTPException(status_code=400, detail=str(e)) + + dialplan_rows = build_dialplan_rows( + endpoint_id=endpoint_id, + extension=extension, + stasis_app=stasis_app, + strip_prefix=strip_prefix, + ) + + try: + async with pool.acquire() as conn: + async with conn.transaction(): + await _delete_existing(conn, endpoint_id) + await _insert_auth(conn, endpoint_id, extension, sip_password) + await _insert_aor(conn, endpoint_id) + await _insert_endpoint( + conn, endpoint_id, transport_name, sip_domain + ) + await _insert_registration( + conn, + endpoint_id=endpoint_id, + transport_name=transport_name, + sip_domain=sip_domain, + extension=extension, + ) + await _insert_extensions(conn, dialplan_rows) + except Exception as e: + logger.exception(f"[3CX/ARA] provisioning failed for {endpoint_id}: {e}") + raise HTTPException( + status_code=502, + detail=( + f"3CX provisioning failed while writing to Asterisk ARA: {e}. " + f"No TelephonyConfiguration was saved." + ), + ) + + logger.info( + f"[3CX/ARA] provisioned endpoint={endpoint_id} " + f"(dialplan rows: {len(dialplan_rows)})" + ) + return credentials + + +async def _deprovision_3cx_trunk(credentials: Dict[str, Any]) -> None: + """Remove all ARA rows for a given trunk. + + Not wired into a hook today — the registry only exposes the + save-time hook. Exposed as a callable so a future + ``post_delete`` extension or admin tooling can use it. + """ + sip_domain = (credentials.get("sip_domain") or "").strip().lower() + extension = (credentials.get("extension") or "").strip() + if not sip_domain or not extension: + return + endpoint_id = endpoint_id_for(sip_domain, extension) + pool = await get_pool() + async with pool.acquire() as conn: + async with conn.transaction(): + await _delete_existing(conn, endpoint_id) + logger.info(f"[3CX/ARA] deprovisioned endpoint={endpoint_id}") + + +# --------------------------------------------------------------------------- +# Per-table writers +# --------------------------------------------------------------------------- + + +async def _delete_existing(conn, endpoint_id: str) -> None: + """Strip every ARA row owned by this endpoint_id. Idempotent.""" + await conn.execute("DELETE FROM ps_registrations WHERE id = $1", endpoint_id) + await conn.execute("DELETE FROM ps_endpoints WHERE id = $1", endpoint_id) + await conn.execute("DELETE FROM ps_aors WHERE id = $1", endpoint_id) + await conn.execute("DELETE FROM ps_auths WHERE id = $1", endpoint_id) + # Dialplan rows live under two derived contexts. + await conn.execute( + "DELETE FROM extensions WHERE context IN ($1, $2)", + f"{endpoint_id}-inbound", + f"{endpoint_id}-outbound", + ) + + +async def _insert_auth(conn, endpoint_id: str, username: str, password: str) -> None: + await conn.execute( + """ + INSERT INTO ps_auths (id, auth_type, username, password) + VALUES ($1, 'userpass', $2, $3) + """, + endpoint_id, + username, + password, + ) + + +async def _insert_aor(conn, endpoint_id: str) -> None: + await conn.execute( + """ + INSERT INTO ps_aors (id, max_contacts, qualify_frequency) + VALUES ($1, 1, 60) + """, + endpoint_id, + ) + + +async def _insert_endpoint( + conn, endpoint_id: str, transport_name: str, sip_domain: str +) -> None: + await conn.execute( + """ + INSERT INTO ps_endpoints ( + id, transport, aors, auth, context, + disallow, allow, from_domain, identify_by + ) VALUES ($1, $2, $1, $1, $3, $4, $5, $6, 'auth_username,username') + """, + endpoint_id, + transport_name, + f"{endpoint_id}-inbound", + _DEFAULT_DISALLOW, + _DEFAULT_ALLOW, + sip_domain, + ) + + +async def _insert_registration( + conn, + *, + endpoint_id: str, + transport_name: str, + sip_domain: str, + extension: str, +) -> None: + server_uri = f"sip:{sip_domain}" + client_uri = f"sip:{extension}@{sip_domain}" + await conn.execute( + """ + INSERT INTO ps_registrations ( + id, transport, outbound_auth, server_uri, client_uri, + contact_user, expiration, retry_interval + ) VALUES ($1, $2, $1, $3, $4, $5, 300, 60) + """, + endpoint_id, + transport_name, + server_uri, + client_uri, + extension, + ) + + +async def _insert_extensions(conn, rows: list[dict]) -> None: + for r in rows: + await conn.execute( + """ + INSERT INTO extensions (context, exten, priority, app, appdata) + VALUES ($1, $2, $3, $4, $5) + """, + r["context"], + r["exten"], + r["priority"], + r["app"], + r["appdata"], + ) diff --git a/api/services/telephony/providers/three_cx/serializers.py b/api/services/telephony/providers/three_cx/serializers.py new file mode 100644 index 00000000..84179d91 --- /dev/null +++ b/api/services/telephony/providers/three_cx/serializers.py @@ -0,0 +1,11 @@ +"""Asterisk frame serializer (re-exported from pipecat). + +3CX runs through an Asterisk bridge, so the wire format and serializer are +identical to the ARI provider. We re-export rather than import from +``..ari`` to keep providers/__init__ from accidentally creating cross-package +coupling — see providers/AGENTS.md. +""" + +from pipecat.serializers.asterisk import AsteriskFrameSerializer + +__all__ = ["AsteriskFrameSerializer"] diff --git a/api/services/telephony/providers/three_cx/strategies.py b/api/services/telephony/providers/three_cx/strategies.py new file mode 100644 index 00000000..2ecd3c1c --- /dev/null +++ b/api/services/telephony/providers/three_cx/strategies.py @@ -0,0 +1,175 @@ +"""3CX transfer/hangup strategies. + +Functionally identical to ``providers/ari/strategies.py`` — 3CX rides on +top of Asterisk so the bridge-swap and channel-delete REST calls are the +same. Duplicated rather than imported because providers/AGENTS.md forbids +cross-provider imports; a future asterisk_base extraction should +consolidate the two. +""" + +from typing import Any, Dict + +from loguru import logger +from pipecat.serializers.call_strategies import HangupStrategy, TransferStrategy + + +class ThreeCxBridgeSwapStrategy(TransferStrategy): + """Bridge-swap transfer over the underlying Asterisk.""" + + async def execute_transfer(self, context: Dict[str, Any]) -> bool: + try: + import aiohttp + import redis.asyncio as aioredis + from aiohttp import BasicAuth + + channel_id = context["channel_id"] + ari_endpoint = context["ari_endpoint"] + app_name = context["app_name"] + app_password = context["app_password"] + + if not channel_id or not ari_endpoint: + logger.warning( + "Cannot execute transfer: missing channel_id or ari_endpoint" + ) + return False + + logger.info( + f"[3CX Transfer] Executing bridge swap for channel {channel_id}" + ) + + from api.constants import REDIS_URL + from api.db import db_client + from api.services.telephony.call_transfer_manager import ( + get_call_transfer_manager, + ) + + auth = BasicAuth(app_name, app_password) + + call_transfer_manager = await get_call_transfer_manager() + + transfer_context = ( + await call_transfer_manager.find_transfer_context_for_call(channel_id) + ) + if not transfer_context: + logger.error( + f"[3CX Transfer] No active transfer context found for caller {channel_id}" + ) + return False + + redis = aioredis.from_url(REDIS_URL, decode_responses=True) + workflow_run_id = await redis.get(f"ari:channel:{channel_id}") + if not workflow_run_id: + logger.error( + f"[3CX Transfer] No workflow run found for caller {channel_id}" + ) + return False + + workflow_run = await db_client.get_workflow_run_by_id(int(workflow_run_id)) + if not workflow_run or not workflow_run.gathered_context: + logger.error( + f"[3CX Transfer] No workflow context for run {workflow_run_id}" + ) + return False + + ctx = workflow_run.gathered_context + bridge_id = ctx.get("bridge_id") + ext_channel_id = ctx.get("ext_channel_id") + + if not bridge_id or not ext_channel_id: + logger.error( + f"[3CX Transfer] Missing bridge/external channel info: {ctx}" + ) + return False + + destination_channel_id = transfer_context.call_sid + if not destination_channel_id: + logger.error( + "[3CX Transfer] No destination channel in transfer context" + ) + return False + + workflow_run.gathered_context["transfer_state"] = "in-progress" + await db_client.update_workflow_run( + run_id=int(workflow_run_id), + gathered_context=workflow_run.gathered_context, + ) + + async with aiohttp.ClientSession() as session: + add_url = f"{ari_endpoint}/ari/bridges/{bridge_id}/addChannel" + async with session.post( + add_url, auth=auth, params={"channel": destination_channel_id} + ) as response: + if response.status not in (200, 204): + error_text = await response.text() + logger.error( + f"[3CX Transfer] Failed to add destination to bridge: " + f"{response.status} {error_text}" + ) + return False + + remove_url = f"{ari_endpoint}/ari/bridges/{bridge_id}/removeChannel" + async with session.post( + remove_url, auth=auth, params={"channel": ext_channel_id} + ) as response: + if response.status not in (200, 204): + error_text = await response.text() + logger.error( + f"[3CX Transfer] Failed to remove external media: " + f"{response.status} {error_text}" + ) + + hangup_url = f"{ari_endpoint}/ari/channels/{ext_channel_id}" + async with session.delete(hangup_url, auth=auth) as response: + if response.status not in (200, 204, 404): + error_text = await response.text() + logger.warning( + f"[3CX Transfer] Failed to hang up external media: " + f"{response.status} {error_text}" + ) + + await call_transfer_manager.remove_transfer_context( + transfer_context.transfer_id + ) + return True + + except Exception as e: + logger.exception(f"Failed to execute 3CX transfer: {e}") + return False + + +class ThreeCxHangupStrategy(HangupStrategy): + """Hang up an Asterisk channel that was bridging to the 3CX trunk.""" + + async def execute_hangup(self, context: Dict[str, Any]) -> bool: + try: + import aiohttp + from aiohttp import BasicAuth + + channel_id = context["channel_id"] + ari_endpoint = context["ari_endpoint"] + app_name = context["app_name"] + app_password = context["app_password"] + + if not channel_id or not ari_endpoint: + logger.warning( + "Cannot hang up Asterisk channel: missing channel_id or ari_endpoint" + ) + return False + + endpoint = f"{ari_endpoint}/ari/channels/{channel_id}" + auth = BasicAuth(app_name, app_password) + + async with aiohttp.ClientSession() as session: + async with session.delete(endpoint, auth=auth) as response: + if response.status in (200, 204, 404): + return True + error_text = await response.text() + logger.error( + f"Failed to terminate channel {channel_id}: " + f"{response.status} {error_text}" + ) + return False + + except Exception as e: + logger.exception(f"Failed to hang up Asterisk channel: {e}") + return False diff --git a/api/services/telephony/providers/three_cx/transport.py b/api/services/telephony/providers/three_cx/transport.py new file mode 100644 index 00000000..87acad7f --- /dev/null +++ b/api/services/telephony/providers/three_cx/transport.py @@ -0,0 +1,77 @@ +"""3CX transport factory. + +3CX rides on top of an Asterisk bridge, so transport is wire-identical to +the ARI provider. The only difference is ``expected_provider="three_cx"`` +so ``load_credentials_for_transport`` validates the right config type. +""" + +from fastapi import WebSocket +from pipecat.transports.websocket.fastapi import ( + FastAPIWebsocketParams, + FastAPIWebsocketTransport, +) + +from api.services.pipecat.audio_config import AudioConfig +from api.services.pipecat.audio_mixer import build_audio_out_mixer +from api.services.pipecat.transport_params import realtime_param_overrides +from api.services.telephony.factory import load_credentials_for_transport + +from .serializers import AsteriskFrameSerializer +from .strategies import ThreeCxBridgeSwapStrategy, ThreeCxHangupStrategy + + +async def create_transport( + websocket: WebSocket, + workflow_run_id: int, + audio_config: AudioConfig, + organization_id: int, + *, + ambient_noise_config: dict | None = None, + telephony_configuration_id: int | None = None, + is_realtime: bool = False, + channel_id: str, +): + """Create a transport for 3CX-via-Asterisk connections.""" + config = await load_credentials_for_transport( + organization_id, telephony_configuration_id, expected_provider="three_cx" + ) + + ari_endpoint = config.get("ari_endpoint") + app_name = config.get("app_name") + app_password = config.get("app_password") + + if not ari_endpoint or not app_name or not app_password: + raise ValueError( + f"Incomplete 3CX configuration for organization {organization_id}. " + f"Required: ari_endpoint, app_name, app_password" + ) + + serializer = AsteriskFrameSerializer( + channel_id=channel_id, + ari_endpoint=ari_endpoint, + app_name=app_name, + app_password=app_password, + transfer_strategy=ThreeCxBridgeSwapStrategy(), + hangup_strategy=ThreeCxHangupStrategy(), + params=AsteriskFrameSerializer.InputParams( + asterisk_sample_rate=audio_config.transport_in_sample_rate, + sample_rate=audio_config.pipeline_sample_rate, + ), + ) + + mixer = await build_audio_out_mixer( + audio_config.transport_out_sample_rate, ambient_noise_config + ) + + return FastAPIWebsocketTransport( + websocket=websocket, + params=FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + audio_in_sample_rate=audio_config.transport_in_sample_rate, + audio_out_sample_rate=audio_config.transport_out_sample_rate, + audio_out_mixer=mixer, + serializer=serializer, + **realtime_param_overrides(is_realtime), + ), + ) diff --git a/api/tests/telephony/test_three_cx.py b/api/tests/telephony/test_three_cx.py new file mode 100644 index 00000000..39b1fa20 --- /dev/null +++ b/api/tests/telephony/test_three_cx.py @@ -0,0 +1,367 @@ +"""Unit tests for the 3CX telephony provider. + +Scope: +* Config schemas (validators, mask/unmask roundtrip via discriminated union) +* Pure-Python helpers (endpoint_id, dialplan row generation) +* Provider methods that don't need a transport (validate_config, + parse_inbound_webhook, validate_account_id) +* Provisioning hook with mocked asyncpg pool — no real Postgres +* SPEC wiring (preprocessor + account_id field) + +These tests deliberately use no DB fixtures, so they don't trigger the +session-scoped test-database setup in ``api/conftest.py``. They still +require ``api/.env.test`` to define ``DATABASE_URL`` and ``REDIS_URL``, +because the root conftest reads ``api.constants`` at import time. +""" + +from __future__ import annotations + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from fastapi import HTTPException + +from api.schemas.telephony_config import ( + TelephonyConfigurationResponse, + ThreeCxConfigurationRequest, + ThreeCxConfigurationResponse, +) +from api.services.telephony import registry +from api.services.telephony.providers.three_cx import SPEC +from api.services.telephony.providers.three_cx.dialplan import ( + _prefix_to_pattern, + build_dialplan_rows, + outbound_context_for, +) +from api.services.telephony.providers.three_cx.provider import ThreeCxProvider +from api.services.telephony.providers.three_cx.provisioning import ( + _provision_3cx_trunk, + endpoint_id_for, +) + +_FULL_CREDS = { + "ari_endpoint": "http://asterisk.example.com:8088", + "app_name": "dograh", + "app_password": "secret", + "ws_client_name": "dograh_staging", + "sip_domain": "1156.3cx.cloud", + "extension": "12611", + "sip_password": "3cx-sip-secret", + "strip_prefix": "^\\+39", + "from_numbers": ["+393331112222"], +} + + +# --------------------------------------------------------------------------- +# endpoint_id_for +# --------------------------------------------------------------------------- + + +def test_endpoint_id_for_italian_3cx_tenant(): + assert endpoint_id_for("1156.3cx.cloud", "12611") == "dograh_1156_3cx_cloud_12611" + + +def test_endpoint_id_for_normalizes_uppercase_and_dots(): + assert ( + endpoint_id_for("ACME.PBX.3CX.cloud", "200") + == "dograh_acme_pbx_3cx_cloud_200" + ) + + +def test_endpoint_id_for_collapses_runs_of_separators(): + assert endpoint_id_for("foo..bar--baz", "9") == "dograh_foo_bar_baz_9" + + +def test_endpoint_id_for_rejects_empty_sip_domain(): + with pytest.raises(ValueError): + endpoint_id_for("", "12611") + + +def test_endpoint_id_for_rejects_empty_extension(): + with pytest.raises(ValueError): + endpoint_id_for("1156.3cx.cloud", "") + + +# --------------------------------------------------------------------------- +# dialplan +# --------------------------------------------------------------------------- + + +def test_prefix_to_pattern_italian(): + pattern, skip = _prefix_to_pattern("^\\+39") + assert pattern == "_+39N." + assert skip == 3 # '+39' is 3 characters + + +def test_prefix_to_pattern_empty_falls_back_to_match_all(): + pattern, skip = _prefix_to_pattern("") + assert pattern == "_X." + assert skip == 0 + + +def test_prefix_to_pattern_unsupported_regex_raises(): + with pytest.raises(ValueError): + _prefix_to_pattern("^\\+[0-9]{2}") + + +def test_build_dialplan_rows_outbound_dials_into_pjsip_endpoint_with_skip(): + rows = build_dialplan_rows( + endpoint_id="dograh_1156_3cx_cloud_12611", + extension="12611", + stasis_app="dograh", + strip_prefix="^\\+39", + ) + outbound = next(r for r in rows if r["context"].endswith("-outbound")) + assert outbound["app"] == "Dial" + assert outbound["exten"] == "_+39N." + assert outbound["appdata"] == "PJSIP/${EXTEN:3}@dograh_1156_3cx_cloud_12611,60" + + +def test_build_dialplan_rows_inbound_routes_extension_and_wildcard_to_stasis(): + rows = build_dialplan_rows( + endpoint_id="dograh_1156_3cx_cloud_12611", + extension="12611", + stasis_app="dograh", + strip_prefix="", + ) + inbound = [r for r in rows if r["context"].endswith("-inbound")] + extens = {r["exten"] for r in inbound} + assert extens == {"12611", "_X."} + for r in inbound: + assert r["app"] == "Stasis" + assert r["appdata"].startswith("dograh,inbound,") + + +def test_outbound_context_for_matches_dialplan_naming(): + rows = build_dialplan_rows( + endpoint_id="ep1", + extension="10", + stasis_app="dograh", + strip_prefix="", + ) + outbound = next(r for r in rows if r["app"] == "Dial") + assert outbound["context"] == outbound_context_for("ep1") + + +# --------------------------------------------------------------------------- +# Config schemas +# --------------------------------------------------------------------------- + + +def test_config_request_validators_strip_and_lowercase_sip_domain(): + req = ThreeCxConfigurationRequest( + ari_endpoint="http://asterisk:8088", + app_name="dograh", + app_password="x", + sip_domain=" 1156.3CX.Cloud ", + extension=" 12611 ", + sip_password="y", + ) + assert req.sip_domain == "1156.3cx.cloud" + assert req.extension == "12611" + + +def test_config_request_provider_literal_defaults_to_three_cx(): + req = ThreeCxConfigurationRequest( + ari_endpoint="x", + app_name="x", + app_password="x", + sip_domain="1156.3cx.cloud", + extension="12611", + sip_password="x", + ) + assert req.provider == "three_cx" + + +def test_telephony_config_response_can_carry_three_cx(): + """The top-level response model must expose a `three_cx` slot.""" + resp = TelephonyConfigurationResponse( + three_cx=ThreeCxConfigurationResponse( + ari_endpoint="x", + app_name="dograh", + app_password="***", # already masked by caller + sip_domain="1156.3cx.cloud", + extension="12611", + sip_password="***", # already masked by caller + from_numbers=["+393331112222"], + ) + ) + assert resp.three_cx is not None + assert resp.three_cx.app_password == "***" + + +# --------------------------------------------------------------------------- +# Provider +# --------------------------------------------------------------------------- + + +def test_provider_validate_config_with_full_data(): + p = ThreeCxProvider(_FULL_CREDS) + assert p.validate_config() is True + + +def test_provider_validate_config_missing_ari_endpoint_is_false(): + creds = {**_FULL_CREDS, "ari_endpoint": ""} + assert ThreeCxProvider(creds).validate_config() is False + + +def test_provider_parse_inbound_webhook_populates_account_id_from_extension(): + webhook = { + "channel": { + "id": "ch-1", + "state": "Ringing", + "caller": {"number": "+393331112222"}, + "dialplan": {"exten": "12611"}, + } + } + n = ThreeCxProvider.parse_inbound_webhook(webhook) + assert n.provider == "three_cx" + assert n.to_number == "12611" + assert n.account_id == "12611" + assert n.from_number == "+393331112222" + + +def test_provider_parse_inbound_webhook_uses_none_for_missing_extension(): + n = ThreeCxProvider.parse_inbound_webhook({"channel": {}}) + assert n.account_id is None + + +def test_provider_validate_account_id_matches_extension(): + assert ThreeCxProvider.validate_account_id({"extension": "12611"}, "12611") is True + + +def test_provider_validate_account_id_rejects_wrong_extension(): + assert ThreeCxProvider.validate_account_id({"extension": "12611"}, "9999") is False + + +def test_provider_validate_account_id_rejects_missing_config_extension(): + assert ThreeCxProvider.validate_account_id({}, "12611") is False + + +# --------------------------------------------------------------------------- +# SPEC registration +# --------------------------------------------------------------------------- + + +def test_spec_registered_with_account_id_extension_and_preprocessor(): + spec = registry.get("three_cx") + assert spec is SPEC + assert spec.account_id_credential_field == "extension" + assert spec.preprocess_credentials_on_save is not None + assert spec.transport_sample_rate == 8000 + + +def test_spec_ui_metadata_marks_passwords_sensitive(): + by_name = {f.name: f for f in SPEC.ui_metadata.fields} + assert by_name["app_password"].sensitive is True + assert by_name["sip_password"].sensitive is True + # Non-secret fields should NOT be marked sensitive. + assert by_name["sip_domain"].sensitive is False + assert by_name["extension"].sensitive is False + + +# --------------------------------------------------------------------------- +# Provisioning (mocked asyncpg) +# --------------------------------------------------------------------------- + + +def _make_mock_pool(): + """Build a mock asyncpg pool whose ``acquire()`` yields a recording conn.""" + conn = MagicMock() + conn.execute = AsyncMock(return_value="OK") + + tx_ctx = MagicMock() + tx_ctx.__aenter__ = AsyncMock(return_value=None) + tx_ctx.__aexit__ = AsyncMock(return_value=False) + conn.transaction = MagicMock(return_value=tx_ctx) + + acquire_ctx = MagicMock() + acquire_ctx.__aenter__ = AsyncMock(return_value=conn) + acquire_ctx.__aexit__ = AsyncMock(return_value=False) + + pool = MagicMock() + pool.acquire = MagicMock(return_value=acquire_ctx) + return pool, conn + + +@patch( + "api.services.telephony.providers.three_cx.provisioning.get_pool", + new_callable=AsyncMock, +) +async def test_provision_writes_six_table_set_in_single_transaction(get_pool_mock): + pool, conn = _make_mock_pool() + get_pool_mock.return_value = pool + + out = await _provision_3cx_trunk(dict(_FULL_CREDS)) + + # Returns the credentials unchanged — endpoint_id is rederived at runtime. + assert out == _FULL_CREDS + + statements = [call.args[0] for call in conn.execute.await_args_list] + # Idempotency deletes come first (5 statements covering 4 ps_* + extensions). + assert sum(1 for s in statements if s.lstrip().startswith("DELETE")) == 5 + # Then one INSERT per ps_* table + one INSERT per dialplan row (3 rows). + inserts = [s for s in statements if "INSERT" in s] + assert any("ps_auths" in s for s in inserts) + assert any("ps_aors" in s for s in inserts) + assert any("ps_endpoints" in s for s in inserts) + assert any("ps_registrations" in s for s in inserts) + assert sum(1 for s in inserts if "INTO extensions" in s) == 3 + # All inserts must happen inside one transaction context. + assert conn.transaction.call_count == 1 + + +@patch( + "api.services.telephony.providers.three_cx.provisioning.get_pool", + new_callable=AsyncMock, +) +async def test_provision_is_idempotent_on_resave(get_pool_mock): + pool, conn = _make_mock_pool() + get_pool_mock.return_value = pool + + await _provision_3cx_trunk(dict(_FULL_CREDS)) + first_call_count = conn.execute.await_count + + await _provision_3cx_trunk(dict(_FULL_CREDS)) + # Second call performs the same delete-then-insert work. + assert conn.execute.await_count == 2 * first_call_count + + +async def test_provision_raises_400_on_missing_required_field(): + bad = {**_FULL_CREDS} + bad.pop("extension") + with pytest.raises(HTTPException) as exc: + await _provision_3cx_trunk(bad) + assert exc.value.status_code == 400 + assert "extension" in exc.value.detail + + +@patch( + "api.services.telephony.providers.three_cx.provisioning.get_pool", + new_callable=AsyncMock, +) +async def test_provision_translates_ara_not_configured_to_400(get_pool_mock): + from api.services.telephony.providers.three_cx.ara_db import ( + AraNotConfiguredError, + ) + + get_pool_mock.side_effect = AraNotConfiguredError("ASTERISK_ARA_DSN not set") + with pytest.raises(HTTPException) as exc: + await _provision_3cx_trunk(dict(_FULL_CREDS)) + assert exc.value.status_code == 400 + assert "ASTERISK_ARA_DSN" in exc.value.detail + + +@patch( + "api.services.telephony.providers.three_cx.provisioning.get_pool", + new_callable=AsyncMock, +) +async def test_provision_translates_db_error_to_502(get_pool_mock): + pool, conn = _make_mock_pool() + conn.execute = AsyncMock(side_effect=RuntimeError("relation \"ps_auths\" does not exist")) + get_pool_mock.return_value = pool + + with pytest.raises(HTTPException) as exc: + await _provision_3cx_trunk(dict(_FULL_CREDS)) + assert exc.value.status_code == 502 + assert "ps_auths" in exc.value.detail diff --git a/docs/providers/three_cx.md b/docs/providers/three_cx.md new file mode 100644 index 00000000..00026372 --- /dev/null +++ b/docs/providers/three_cx.md @@ -0,0 +1,203 @@ +# 3CX Telephony Provider + +Connect a Dograh AI agent to a **3CX cloud PBX** through an intermediate +**Asterisk** bridge. The Asterisk box terminates the SIP/RTP leg toward +3CX and exposes a standard ARI + externalMedia surface to Dograh — +identical to the [Asterisk ARI provider](../integrations/telephony/asterisk-ari) +plus an automated trunk-provisioning step. + +``` ++-------------------+ SIP/RTP +-------------+ ARI REST + +---------+ +| 3CX cloud SBC | <------------> | Asterisk | WS audio | -> | Dograh | +| 1156.3cx.cloud | | (PJSIP ARA)| | | agent | ++-------------------+ +-------------+ + +---------+ + ^ + | ps_endpoints, ps_aors, + | ps_auths, ps_registrations, + | extensions (Postgres) + | + +----+----+ + | Dograh | + | save UI | + +---------+ +``` + +When an admin saves a 3CX TelephonyConfiguration in the Dograh UI, the +provider's `preprocess_credentials_on_save` hook writes the matching +PJSIP endpoint/aor/auth/registration rows and the `+39`-stripping +dialplan into the Asterisk Realtime Architecture (ARA) Postgres. Asterisk +picks them up dynamically — no `pjsip reload` needed. + +## §1 — Asterisk side prerequisites + +The bridging Asterisk must be ≥ Asterisk 18 with PJSIP and ARA enabled +against the same Postgres Dograh writes to. One Asterisk instance can +serve many Dograh 3CX configurations (multi-tenant) because each trunk +gets a unique endpoint id of the form `dograh__`. + +### 1.1 Postgres tables + +Run the standard Asterisk realtime DDL on the ARA database — the +relevant tables are `ps_auths`, `ps_aors`, `ps_endpoints`, +`ps_registrations`, `ps_transports`, `ps_globals`, and `extensions`. +The canonical schema ships with Asterisk under `contrib/realtime/postgresql/`. + +### 1.2 `res_config_pgsql.conf` + +```ini +[asterisk] +type = pgsql +hostname = postgres.internal +dbname = asterisk_ara +user = asterisk_ro +password = ******** +port = 5432 +requirements = warn +``` + +### 1.3 `sorcery.conf` + +```ini +[res_pjsip] +endpoint = realtime,ps_endpoints +auth = realtime,ps_auths +aor = realtime,ps_aors +domain_alias = realtime,ps_domain_aliases +contact = realtime,ps_contacts + +[res_pjsip_endpoint_identifier_ip] +identify = realtime,ps_endpoint_id_ips + +[res_pjsip_outbound_registration] +registration = realtime,ps_registrations +``` + +### 1.4 `extconfig.conf` + +```ini +[settings] +ps_endpoints = pgsql,asterisk +ps_auths = pgsql,asterisk +ps_aors = pgsql,asterisk +ps_registrations = pgsql,asterisk +extensions = pgsql,asterisk +``` + +### 1.5 Static PJSIP transport + +Dograh writes endpoints that reference a transport by name (default: +`transport-udp`). Define it once in `pjsip.conf`: + +```ini +[transport-udp] +type = transport +protocol = udp +bind = 0.0.0.0:5060 +``` + +### 1.6 Stasis app + externalMedia + +```ini +; ari.conf +[general] +enabled = yes +[dograh] +type = user +read_only = no +password = + +; websocket_client.conf +[dograh_staging] +type = websocket_client +uri = ws://dograh-backend:8000/api/v1/telephony/ws/ari +protocols = media +connection_type = persistent +``` + +Start the Stasis app and confirm registration is happening: + +```bash +asterisk -rx "module reload res_pjsip.so" +asterisk -rx "pjsip show registrations" +``` + +## §2 — Dograh side prerequisites + +Set the connection string to the ARA Postgres in `api/.env`: + +```bash +ASTERISK_ARA_DSN=postgresql://dograh_rw:********@postgres.internal:5432/asterisk_ara +``` + +The user needs `SELECT, INSERT, UPDATE, DELETE` on `ps_auths`, +`ps_aors`, `ps_endpoints`, `ps_registrations`, and `extensions`. No DDL +permissions required at runtime. + +Restart the Dograh API process after setting the env var. + +## §3 — Per-trunk flow in the Dograh UI + +For each 3CX tenant + extension Dograh should serve: + +1. Open *Telephony Configurations* → *Add* → select **3CX (Asterisk bridge)**. +2. Fill in the form: + + | Field | Value (example) | + | --- | --- | + | ARI Endpoint | `http://asterisk.internal:8088` | + | Stasis App Name | `dograh` | + | ARI Password | _(matches `ari.conf` `[dograh]` password)_ | + | websocket_client.conf Name | `dograh_staging` | + | 3CX SIP Domain | `1156.3cx.cloud` | + | 3CX Extension | `12611` | + | SIP Password | _(from `~/.claude-phone/.env` or 3CX admin console)_ | + | Strip Prefix (regex) | `^\+39` | + | From Numbers | `+393331112222` | + +3. Save. On save the `preprocess_credentials_on_save` hook writes the + six-table ARA set in a single transaction. A failure aborts the save + with `HTTP 502` and a message describing which write failed; nothing + persists. + +## §4 — Verification + +Confirm the trunk landed in ARA: + +```bash +psql $ASTERISK_ARA_DSN -c \ + "SELECT id FROM ps_endpoints WHERE id LIKE 'dograh\\_%'" +psql $ASTERISK_ARA_DSN -c \ + "SELECT id, server_uri FROM ps_registrations WHERE id LIKE 'dograh\\_%'" +``` + +Confirm Asterisk has registered upstream with 3CX: + +```bash +asterisk -rx "pjsip show registrations" +# Expect: Registered +``` + +Originate a test outbound call from Dograh and verify the `+39` prefix +was stripped on the way out: + +```bash +asterisk -rx "core set verbose 4" +# In another terminal: trigger an outbound from the Dograh API. +# In the Asterisk console you should see: +# Dial: PJSIP/3331112222@dograh_1156_3cx_cloud_12611 +# i.e. without '+39'. +``` + +## Known limitations + +* The hook **only** supports the literal `^\+` regex form for + `strip_prefix`. PCRE alternation isn't translated to Asterisk's + ad-hoc extension pattern syntax. Adding a `[02-9]` or branching + regex needs an extension to `dialplan._prefix_to_pattern`. +* Deprovisioning on TelephonyConfiguration deletion is not currently + wired. `provisioning._deprovision_3cx_trunk` exists as a callable but + no registry hook fires it; admin tooling can call it directly. Filed + for follow-up rather than in scope for the initial provider. +* `transport_name` is hard-coded to `transport-udp` (overridable per + credentials dict). TLS or TCP trunks toward 3CX need the admin to + define the transport and pass the name through. From e15246a51529a7ef2619c15559808b89f1b0ea83 Mon Sep 17 00:00:00 2001 From: stefandsl Date: Tue, 26 May 2026 13:29:51 +0200 Subject: [PATCH 2/3] chore(ui): align dev server port with production (3010) Co-Authored-By: Claude Opus 4.7 (1M context) --- ui/AGENTS.md | 2 +- ui/package.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ui/AGENTS.md b/ui/AGENTS.md index 83f818bf..dad34c5b 100644 --- a/ui/AGENTS.md +++ b/ui/AGENTS.md @@ -75,5 +75,5 @@ The auth interceptor (which attaches the Bearer token) is only registered once a ```bash npm install -npm run dev # Runs on port 3000 +npm run dev # Runs on port 3010 ``` diff --git a/ui/package.json b/ui/package.json index 1e4f7c53..e4ecae7f 100644 --- a/ui/package.json +++ b/ui/package.json @@ -3,7 +3,7 @@ "version": "1.31.0", "private": true, "scripts": { - "dev": "cross-env NODE_OPTIONS=--enable-source-maps next dev --turbopack", + "dev": "cross-env NODE_OPTIONS=--enable-source-maps next dev --turbopack -p 3010", "build": "next build", "start": "next start", "lint": "next lint", From 592e6f7970c65ce6ab37ca71ec4c8542c3ab139e Mon Sep 17 00:00:00 2001 From: stefandsl Date: Tue, 26 May 2026 15:05:13 +0200 Subject: [PATCH 3/3] chore(dev): add docker-compose.dev.yaml and default to same-origin UI MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds an opt-in compose override that bind-mounts the three_cx provider files (and the alembic versions dir, so newer local migrations resolve) into the prebuilt API image — useful when iterating on a provider without rebuilding. Loaded explicitly via `docker compose -f docker-compose.yaml -f docker-compose.dev.yaml up`. Also blanks the default `NEXT_PUBLIC_BACKEND_URL` in `ui/.env.example` so the browser uses `window.location.origin` and the Next.js rewrite in `next.config.ts` proxies `/api/v1/*` to the backend. This works for both single-host dev and remote dev (where `localhost:8000` from the browser points at the wrong machine). Co-Authored-By: Claude Opus 4.7 (1M context) --- docker-compose.dev.yaml | 7 +++++++ ui/.env.example | 2 +- ui/package-lock.json | 4 ++-- 3 files changed, 10 insertions(+), 3 deletions(-) create mode 100644 docker-compose.dev.yaml diff --git a/docker-compose.dev.yaml b/docker-compose.dev.yaml new file mode 100644 index 00000000..6d99cc53 --- /dev/null +++ b/docker-compose.dev.yaml @@ -0,0 +1,7 @@ +services: + api: + volumes: + - ./api/services/telephony/providers/three_cx:/app/api/services/telephony/providers/three_cx:ro + - ./api/services/telephony/providers/__init__.py:/app/api/services/telephony/providers/__init__.py:ro + - ./api/schemas/telephony_config.py:/app/api/schemas/telephony_config.py:ro + - ./api/alembic/versions:/app/api/alembic/versions:ro diff --git a/ui/.env.example b/ui/.env.example index 741790e5..185aa15c 100644 --- a/ui/.env.example +++ b/ui/.env.example @@ -1,3 +1,3 @@ BACKEND_URL=http://localhost:8000 -NEXT_PUBLIC_BACKEND_URL=http://localhost:8000 +NEXT_PUBLIC_BACKEND_URL= NEXT_PUBLIC_NODE_ENV=development diff --git a/ui/package-lock.json b/ui/package-lock.json index 9923bb0f..6017771b 100644 --- a/ui/package-lock.json +++ b/ui/package-lock.json @@ -1,12 +1,12 @@ { "name": "ui", - "version": "1.30.1", + "version": "1.31.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "ui", - "version": "1.30.1", + "version": "1.31.0", "dependencies": { "@dagrejs/dagre": "^1.1.4", "@radix-ui/react-alert-dialog": "^1.1.15",