dograh/api/services/telephony/factory.py
Abhishek 7fd3b96470
feat: agent stream for cloudonix OPBX (#261)
* feat: agent stream for cloudonix OPBX

* feat: make cloudonix app name optional

* feat: create application while configuring telephony config

* fix: get telephony configuration from stamped workflow run

* fix: fix vobiz hangup URL
2026-05-02 15:53:58 +05:30

249 lines
9.2 KiB
Python

"""Factory for creating telephony providers.
Resolves a provider instance from a stored telephony configuration. Three
resolution paths exist:
* by config id — the canonical path used by outbound (test calls, campaigns,
API triggers) and by the websocket transport once a workflow run has
``initial_context.telephony_configuration_id`` stamped on it.
* by org default — used as a fallback when no specific config is requested
(e.g. the legacy ``/telephony-config`` endpoint, the back-compat
``get_telephony_provider(organization_id)`` shim).
* for inbound — given a detected provider and an account-id from the webhook,
iterate the org's configs of that provider and return the one whose stored
account-id credential matches.
Provider classes don't need to know about the new storage shape. They still
receive a normalized config dict containing credentials plus a
``from_numbers`` list of address strings, which the factory assembles by
joining ``telephony_phone_numbers``.
"""
from typing import Any, Dict, List, Optional, Tuple, Type
from loguru import logger
from api.db import db_client
from api.db.models import TelephonyConfigurationModel, WorkflowRunModel
from api.services.telephony import registry
from api.services.telephony.base import TelephonyProvider
async def load_telephony_config_by_id(
telephony_configuration_id: int,
organization_id: int,
) -> Dict[str, Any]:
"""Load and normalize the config row by primary key, scoped to the org.
Returns a dict in the shape each provider class expects in its constructor
(provider name + provider-specific credentials + ``from_numbers`` list of
raw address strings). Raises ``ValueError`` if the config doesn't exist
or doesn't belong to ``organization_id`` — the org scope is what makes
this safe to expose to user-driven request flows.
"""
if not telephony_configuration_id:
raise ValueError("telephony_configuration_id is required")
if not organization_id:
raise ValueError("organization_id is required")
row = await db_client.get_telephony_configuration_for_org(
telephony_configuration_id, organization_id
)
if not row:
raise ValueError(
f"Telephony configuration {telephony_configuration_id} not found "
f"for organization {organization_id}"
)
return await _normalize_with_phone_numbers(row)
async def load_default_telephony_config(organization_id: int) -> Dict[str, Any]:
"""Load the org's default outbound config."""
if not organization_id:
raise ValueError("organization_id is required")
row = await db_client.get_default_telephony_configuration(organization_id)
if not row:
raise ValueError(
f"No default telephony configuration found for organization "
f"{organization_id}"
)
return await _normalize_with_phone_numbers(row)
async def find_telephony_config_for_inbound(
organization_id: int, provider_name: str, account_id: Optional[str]
) -> Optional[Tuple[int, Dict[str, Any]]]:
"""Match an inbound webhook to one of the org's configs of the detected
provider. Returns ``(config_id, normalized_config)`` or None.
Always scoped to ``organization_id`` — never matches across orgs even if
two orgs happen to have credentials with the same account_id.
"""
spec = registry.get_optional(provider_name)
if not spec:
return None
candidates = await db_client.list_telephony_configurations_by_provider(
organization_id, provider_name
)
if not candidates:
return None
field = spec.account_id_credential_field
matched: Optional[TelephonyConfigurationModel] = None
if not field:
# Provider has no account-id concept (e.g. ARI); only one config of this
# provider is meaningful per org.
if len(candidates) == 1:
matched = candidates[0]
else:
logger.warning(
f"Provider {provider_name} has multiple configs in org "
f"{organization_id} but no account_id field to disambiguate; "
f"picking the default outbound (or first)."
)
matched = next(
(c for c in candidates if c.is_default_outbound), candidates[0]
)
elif account_id:
for cand in candidates:
stored = (cand.credentials or {}).get(field)
if stored and stored == account_id:
matched = cand
break
if not matched:
return None
normalized = await _normalize_with_phone_numbers(matched)
return matched.id, normalized
async def get_telephony_provider_by_id(
telephony_configuration_id: int,
organization_id: int,
) -> TelephonyProvider:
config = await load_telephony_config_by_id(
telephony_configuration_id, organization_id
)
return _instantiate(config)
async def get_telephony_provider_for_run(
workflow_run: WorkflowRunModel,
organization_id: int,
) -> TelephonyProvider:
"""Resolve the provider for a given workflow run.
Prefers ``initial_context.telephony_configuration_id`` — stamped at run
creation by ``/initiate-call``, ``_create_inbound_workflow_run``, the
campaign dispatcher, and ``public_agent``. Falls back to the org's
default config so legacy runs created before the multi-config migration
still resolve.
"""
cfg_id = (workflow_run.initial_context or {}).get("telephony_configuration_id")
if cfg_id:
return await get_telephony_provider_by_id(cfg_id, organization_id)
return await get_default_telephony_provider(organization_id)
async def get_default_telephony_provider(organization_id: int) -> TelephonyProvider:
config = await load_default_telephony_config(organization_id)
return _instantiate(config)
async def get_telephony_provider_for_inbound(
organization_id: int, provider_name: str, account_id: Optional[str]
) -> Optional[Tuple[int, TelephonyProvider]]:
"""Returns ``(config_id, provider_instance)`` or None when no config matches."""
match = await find_telephony_config_for_inbound(
organization_id, provider_name, account_id
)
if not match:
return None
config_id, config = match
return config_id, _instantiate(config)
async def load_credentials_for_transport(
organization_id: int,
telephony_configuration_id: Optional[int],
expected_provider: str,
) -> Dict[str, Any]:
"""Helper for per-provider transport modules.
Resolves the right credentials for a websocket transport given what's
available on the workflow run. Uses ``telephony_configuration_id`` when
stamped (the new path), otherwise falls back to the org's default config
so legacy runs created before the multi-config migration still work.
Raises ValueError when the resolved config is for a different provider.
"""
if telephony_configuration_id:
config = await load_telephony_config_by_id(
telephony_configuration_id, organization_id
)
else:
config = await load_default_telephony_config(organization_id)
actual = config.get("provider")
if actual != expected_provider:
raise ValueError(
f"Expected {expected_provider} provider, got {actual} "
f"(config_id={telephony_configuration_id}, org={organization_id})"
)
return config
# ---------------------------------------------------------------------------
# Back-compat shims
# ---------------------------------------------------------------------------
async def load_telephony_config(organization_id: int) -> Dict[str, Any]:
"""Deprecated: returns the org's default config.
Existing callers that don't carry a config id continue to work via this
shim. New code should pass an explicit telephony_configuration_id."""
return await load_default_telephony_config(organization_id)
async def get_telephony_provider(organization_id: int) -> TelephonyProvider:
"""Deprecated: returns a provider for the org's default config.
See ``load_telephony_config`` above. New code should call
``get_telephony_provider_by_id`` with the resolved config id.
"""
return await get_default_telephony_provider(organization_id)
async def get_all_telephony_providers() -> List[Type[TelephonyProvider]]:
"""All registered provider classes — used by inbound webhook detection."""
return [spec.provider_cls for spec in registry.all_specs()]
# ---------------------------------------------------------------------------
# Internals
# ---------------------------------------------------------------------------
async def _normalize_with_phone_numbers(
row: TelephonyConfigurationModel,
) -> Dict[str, Any]:
"""Run the provider's config_loader over the credentials, then attach the
active phone numbers as a ``from_numbers`` list (raw address strings)."""
spec = registry.get(row.provider)
raw = dict(row.credentials or {})
raw["provider"] = row.provider
base = spec.config_loader(raw)
addresses = await db_client.list_active_normalized_addresses_for_config(row.id)
base["from_numbers"] = addresses
return base
def _instantiate(config: Dict[str, Any]) -> TelephonyProvider:
spec = registry.get(config["provider"])
logger.info(f"Creating {spec.name} telephony provider")
return spec.provider_cls(config)