dograh/api/routes/organization.py

964 lines
35 KiB
Python
Raw Permalink Normal View History

from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException
from loguru import logger
from pydantic import BaseModel
from sqlalchemy.exc import IntegrityError
from api.constants import DEFAULT_CAMPAIGN_RETRY_CONFIG, DEFAULT_ORG_CONCURRENCY_LIMIT
from api.db import db_client
from api.db.models import UserModel
from api.db.telephony_configuration_client import TelephonyConfigurationInUseError
from api.enums import OrganizationConfigurationKey, PostHogEvent
from api.schemas.telephony_config import (
TelephonyConfigRequest,
TelephonyConfigurationCreateRequest,
TelephonyConfigurationDetail,
TelephonyConfigurationListItem,
TelephonyConfigurationListResponse,
TelephonyConfigurationResponse,
TelephonyConfigurationUpdateRequest,
)
from api.schemas.telephony_phone_number import (
PhoneNumberCreateRequest,
PhoneNumberListResponse,
PhoneNumberResponse,
PhoneNumberUpdateRequest,
ProviderSyncStatus,
)
from api.services.auth.depends import get_user
from api.services.configuration.masking import is_mask_of, mask_key
from api.services.posthog_client import capture_event
from api.services.telephony import registry as telephony_registry
from api.services.telephony.factory import get_telephony_provider_by_id
from api.services.worker_sync.manager import get_worker_sync_manager
from api.services.worker_sync.protocol import WorkerSyncEventType
from api.utils.common import get_backend_endpoints
router = APIRouter(prefix="/organizations", tags=["organizations"])
def _sensitive_fields(provider_name: str) -> List[str]:
"""Field names that should be masked when displaying stored config.
Sourced from ProviderUIField.sensitive in the registry the same source
of truth that drives the form-rendering UI.
"""
spec = telephony_registry.get_optional(provider_name)
if spec is None or spec.ui_metadata is None:
return []
return [f.name for f in spec.ui_metadata.fields if f.sensitive]
def _mask_sensitive(provider_name: str, value: dict) -> dict:
"""Return a copy of ``value`` with sensitive fields masked for display."""
out = dict(value)
for field_name in _sensitive_fields(provider_name):
v = out.get(field_name)
if v:
out[field_name] = mask_key(v)
return out
class TelephonyProviderUIField(BaseModel):
"""One form field on a telephony provider's configuration UI."""
name: str
label: str
type: str
required: bool
sensitive: bool
description: Optional[str] = None
placeholder: Optional[str] = None
class TelephonyProviderMetadata(BaseModel):
"""UI form metadata for a single telephony provider."""
provider: str
display_name: str
fields: List[TelephonyProviderUIField]
docs_url: Optional[str] = None
class TelephonyProvidersMetadataResponse(BaseModel):
"""List of UI form definitions used by the telephony-config screen."""
providers: List[TelephonyProviderMetadata]
class TelephonyConfigWarningsResponse(BaseModel):
"""Aggregated telephony-configuration warning counts for the user's org.
Drives the page banner and nav badge that nudge customers to finish
optional-but-recommended configuration steps. Shape is a flat dict so
new warning types can be added without breaking the client.
"""
telnyx_missing_webhook_public_key_count: int
@router.get(
"/telephony-providers/metadata",
response_model=TelephonyProvidersMetadataResponse,
)
async def get_telephony_providers_metadata(user: UserModel = Depends(get_user)):
"""Return the list of available telephony providers and their form schemas.
The UI uses this to render the configuration form generically instead of
hard-coding fields per provider. Adding a new provider only requires
declaring its ui_metadata in providers/<name>/__init__.py.
"""
if not user.selected_organization_id:
raise HTTPException(status_code=400, detail="No organization selected")
providers = []
for spec in telephony_registry.all_specs():
if spec.ui_metadata is None:
continue
providers.append(
TelephonyProviderMetadata(
provider=spec.name,
display_name=spec.ui_metadata.display_name,
fields=[
TelephonyProviderUIField(
name=f.name,
label=f.label,
type=f.type,
required=f.required,
sensitive=f.sensitive,
description=f.description,
placeholder=f.placeholder,
)
for f in spec.ui_metadata.fields
],
docs_url=spec.ui_metadata.docs_url,
)
)
return TelephonyProvidersMetadataResponse(providers=providers)
@router.get(
"/telephony-config-warnings",
response_model=TelephonyConfigWarningsResponse,
)
async def get_telephony_config_warnings(user: UserModel = Depends(get_user)):
"""Return aggregated warning counts for the current org's telephony configs.
Today this surfaces only Telnyx configs missing ``webhook_public_key``;
additional warning types should be added as new fields on the response.
"""
if not user.selected_organization_id:
raise HTTPException(status_code=400, detail="No organization selected")
telnyx_missing = await db_client.count_telnyx_configs_missing_webhook_public_key(
user.selected_organization_id
)
return TelephonyConfigWarningsResponse(
telnyx_missing_webhook_public_key_count=telnyx_missing,
)
def preserve_masked_fields(provider: str, request_dict: dict, existing: dict):
"""If the client re-submitted a masked sensitive field, restore the original."""
for field_name in _sensitive_fields(provider):
v = request_dict.get(field_name)
if v and is_mask_of(v, existing.get(field_name, "")):
request_dict[field_name] = existing[field_name]
def _credentials_from_payload(config: TelephonyConfigRequest) -> dict:
"""Provider credentials only — strip provider/from_numbers from the payload."""
payload = config.model_dump()
payload.pop("provider", None)
payload.pop("from_numbers", None)
return payload
async def _run_preprocess_hook(provider: str, credentials: dict) -> dict:
"""Invoke the provider's optional credentials preprocessor before save."""
spec = telephony_registry.get_optional(provider)
if spec and spec.preprocess_credentials_on_save:
return await spec.preprocess_credentials_on_save(credentials)
return credentials
def _phone_number_to_response(
row, inbound_workflow_name: Optional[str] = None
) -> PhoneNumberResponse:
response = PhoneNumberResponse.model_validate(row)
response.inbound_workflow_name = inbound_workflow_name
return response
async def _sync_inbound_for_phone_number(
config_id: int, organization_id: int, address: str
) -> ProviderSyncStatus:
"""Push inbound webhook configuration to the provider.
``attach=True``: ask the provider to route this number's inbound calls
to our workflow-agnostic dispatcher (``/api/v1/telephony/inbound/run``).
``attach=False``: ask the provider to detach. The dispatcher resolves
the workflow from the called number's ``inbound_workflow_id``, so the
webhook URL is the same for every assignment providers only need to
bind/unbind the number, not rewrite per-workflow URLs.
"""
try:
provider = await get_telephony_provider_by_id(config_id, organization_id)
except Exception as e:
logger.error(f"Failed to load telephony provider for config {config_id}: {e}")
return ProviderSyncStatus(ok=False, message=f"Provider load failed: {e}")
backend_endpoint, _ = await get_backend_endpoints()
webhook_url = f"{backend_endpoint}/api/v1/telephony/inbound/run"
try:
result = await provider.configure_inbound(address, webhook_url)
except Exception as e:
logger.error(
f"Provider configure_inbound raised for config {config_id} "
f"address {address}: {e}"
)
return ProviderSyncStatus(ok=False, message=f"Provider sync failed: {e}")
return ProviderSyncStatus(ok=result.ok, message=result.message)
# ---------------------------------------------------------------------------
# Multi-config CRUD
# ---------------------------------------------------------------------------
@router.get("/telephony-configs", response_model=TelephonyConfigurationListResponse)
async def list_telephony_configurations(user: UserModel = Depends(get_user)):
"""List the org's telephony configurations with phone-number counts."""
if not user.selected_organization_id:
raise HTTPException(status_code=400, detail="No organization selected")
rows = await db_client.list_telephony_configurations(user.selected_organization_id)
items: List[TelephonyConfigurationListItem] = []
for row in rows:
numbers = await db_client.list_phone_numbers_for_config(row.id)
items.append(
TelephonyConfigurationListItem(
id=row.id,
name=row.name,
provider=row.provider,
is_default_outbound=row.is_default_outbound,
phone_number_count=len([n for n in numbers if n.is_active]),
created_at=row.created_at,
updated_at=row.updated_at,
)
)
return TelephonyConfigurationListResponse(configurations=items)
@router.post("/telephony-configs", response_model=TelephonyConfigurationDetail)
async def create_telephony_configuration(
request: TelephonyConfigurationCreateRequest,
user: UserModel = Depends(get_user),
):
"""Create a new telephony configuration for the org."""
if not user.selected_organization_id:
raise HTTPException(status_code=400, detail="No organization selected")
credentials = _credentials_from_payload(request.config)
credentials = await _run_preprocess_hook(request.config.provider, credentials)
try:
row = await db_client.create_telephony_configuration(
organization_id=user.selected_organization_id,
name=request.name,
provider=request.config.provider,
credentials=credentials,
is_default_outbound=request.is_default_outbound,
)
except IntegrityError as e:
if "uq_telephony_configurations_org_name" in str(e):
raise HTTPException(
status_code=409,
detail=(
f"A telephony configuration named '{request.name}' already "
f"exists in this organization. Pick a different name."
),
)
raise HTTPException(
status_code=409,
detail="Telephony configuration violates a uniqueness constraint.",
)
capture_event(
distinct_id=str(user.provider_id),
event=PostHogEvent.TELEPHONY_CONFIGURED,
properties={
"provider": request.config.provider,
"organization_id": user.selected_organization_id,
"config_id": row.id,
},
)
return _detail_response(row)
@router.get(
"/telephony-configs/{config_id}", response_model=TelephonyConfigurationDetail
)
async def get_telephony_configuration_by_id(
config_id: int, user: UserModel = Depends(get_user)
):
if not user.selected_organization_id:
raise HTTPException(status_code=400, detail="No organization selected")
row = await db_client.get_telephony_configuration_for_org(
config_id, user.selected_organization_id
)
if not row:
raise HTTPException(status_code=404, detail="Telephony configuration not found")
return _detail_response(row)
@router.put(
"/telephony-configs/{config_id}", response_model=TelephonyConfigurationDetail
)
async def update_telephony_configuration(
config_id: int,
request: TelephonyConfigurationUpdateRequest,
user: UserModel = Depends(get_user),
):
if not user.selected_organization_id:
raise HTTPException(status_code=400, detail="No organization selected")
existing = await db_client.get_telephony_configuration_for_org(
config_id, user.selected_organization_id
)
if not existing:
raise HTTPException(status_code=404, detail="Telephony configuration not found")
credentials = None
if request.config is not None:
if request.config.provider != existing.provider:
raise HTTPException(
status_code=400,
detail="Provider cannot be changed; create a new configuration instead.",
)
credentials = _credentials_from_payload(request.config)
preserve_masked_fields(
existing.provider, credentials, existing.credentials or {}
)
credentials = await _run_preprocess_hook(existing.provider, credentials)
row = await db_client.update_telephony_configuration(
config_id=config_id,
organization_id=user.selected_organization_id,
name=request.name,
credentials=credentials,
)
return _detail_response(row)
@router.post(
"/telephony-configs/{config_id}/set-default-outbound",
response_model=TelephonyConfigurationDetail,
)
async def set_default_outbound(config_id: int, user: UserModel = Depends(get_user)):
if not user.selected_organization_id:
raise HTTPException(status_code=400, detail="No organization selected")
row = await db_client.set_default_telephony_configuration(
config_id, user.selected_organization_id
)
if not row:
raise HTTPException(status_code=404, detail="Telephony configuration not found")
return _detail_response(row)
@router.delete("/telephony-configs/{config_id}")
async def delete_telephony_configuration(
config_id: int, user: UserModel = Depends(get_user)
):
if not user.selected_organization_id:
raise HTTPException(status_code=400, detail="No organization selected")
try:
deleted = await db_client.delete_telephony_configuration(
config_id, user.selected_organization_id
)
except TelephonyConfigurationInUseError as e:
raise HTTPException(status_code=409, detail=str(e))
if not deleted:
raise HTTPException(status_code=404, detail="Telephony configuration not found")
return {"message": "Telephony configuration deleted"}
def _detail_response(row) -> TelephonyConfigurationDetail:
masked = _mask_sensitive(row.provider, row.credentials or {})
return TelephonyConfigurationDetail(
id=row.id,
name=row.name,
provider=row.provider,
is_default_outbound=row.is_default_outbound,
credentials=masked,
created_at=row.created_at,
updated_at=row.updated_at,
)
# ---------------------------------------------------------------------------
# Phone numbers (nested under a config)
# ---------------------------------------------------------------------------
async def _ensure_config_belongs_to_org(config_id: int, organization_id: int):
cfg = await db_client.get_telephony_configuration_for_org(
config_id, organization_id
)
if not cfg:
raise HTTPException(status_code=404, detail="Telephony configuration not found")
return cfg
async def _ensure_workflow_belongs_to_org(workflow_id: int, organization_id: int):
workflow = await db_client.get_workflow(
workflow_id, organization_id=organization_id
)
if not workflow:
raise HTTPException(status_code=404, detail="Workflow not found")
return workflow
@router.get(
"/telephony-configs/{config_id}/phone-numbers",
response_model=PhoneNumberListResponse,
)
async def list_phone_numbers(config_id: int, user: UserModel = Depends(get_user)):
if not user.selected_organization_id:
raise HTTPException(status_code=400, detail="No organization selected")
await _ensure_config_belongs_to_org(config_id, user.selected_organization_id)
rows = await db_client.list_phone_numbers_with_workflow_name_for_config(config_id)
return PhoneNumberListResponse(
phone_numbers=[_phone_number_to_response(r, name) for r, name in rows]
)
@router.post(
"/telephony-configs/{config_id}/phone-numbers",
response_model=PhoneNumberResponse,
)
async def create_phone_number(
config_id: int,
request: PhoneNumberCreateRequest,
user: UserModel = Depends(get_user),
):
if not user.selected_organization_id:
raise HTTPException(status_code=400, detail="No organization selected")
cfg = await _ensure_config_belongs_to_org(config_id, user.selected_organization_id)
if request.inbound_workflow_id is not None:
await _ensure_workflow_belongs_to_org(
request.inbound_workflow_id, user.selected_organization_id
)
# Inbound dispatch (find_inbound_route_by_account) keys on (provider,
# credentials[account_id_field], address_normalized) without the org, so
# that tuple has to be globally unique. Reject up front if another config —
# in this org or any other — already owns the same combination.
spec = telephony_registry.get_optional(cfg.provider)
account_field = spec.account_id_credential_field if spec else ""
account_id = (cfg.credentials or {}).get(account_field) if account_field else None
if account_id:
try:
conflict = await db_client.find_inbound_routing_conflict(
provider=cfg.provider,
account_id_field=account_field,
account_id=account_id,
address=request.address,
country_hint=request.country_code,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
if conflict:
existing_cfg, existing_phone = conflict
same_org = existing_cfg.organization_id == user.selected_organization_id
scope = (
f"telephony configuration '{existing_cfg.name}'"
if same_org
else "another organization using the same provider account"
)
raise HTTPException(
status_code=409,
detail=(
f"Phone number {existing_phone.address} is already registered "
f"under {scope}. Inbound calls cannot be uniquely routed when "
f"the same number is configured against the same provider "
f"account in more than one place."
),
)
try:
row = await db_client.create_phone_number(
organization_id=user.selected_organization_id,
telephony_configuration_id=config_id,
address=request.address,
country_code=request.country_code,
label=request.label,
inbound_workflow_id=request.inbound_workflow_id,
is_active=request.is_active,
is_default_caller_id=request.is_default_caller_id,
extra_metadata=request.extra_metadata,
)
except IntegrityError:
raise HTTPException(
status_code=409,
detail="A phone number with this address already exists in the org.",
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
response = _phone_number_to_response(row)
if request.inbound_workflow_id is not None:
response.provider_sync = await _sync_inbound_for_phone_number(
config_id, user.selected_organization_id, row.address
)
return response
@router.get(
"/telephony-configs/{config_id}/phone-numbers/{phone_number_id}",
response_model=PhoneNumberResponse,
)
async def get_phone_number(
config_id: int,
phone_number_id: int,
user: UserModel = Depends(get_user),
):
if not user.selected_organization_id:
raise HTTPException(status_code=400, detail="No organization selected")
await _ensure_config_belongs_to_org(config_id, user.selected_organization_id)
row = await db_client.get_phone_number_for_config(phone_number_id, config_id)
if not row:
raise HTTPException(status_code=404, detail="Phone number not found")
return _phone_number_to_response(row)
@router.put(
"/telephony-configs/{config_id}/phone-numbers/{phone_number_id}",
response_model=PhoneNumberResponse,
)
async def update_phone_number(
config_id: int,
phone_number_id: int,
request: PhoneNumberUpdateRequest,
user: UserModel = Depends(get_user),
):
if not user.selected_organization_id:
raise HTTPException(status_code=400, detail="No organization selected")
await _ensure_config_belongs_to_org(config_id, user.selected_organization_id)
existing = await db_client.get_phone_number_for_config(phone_number_id, config_id)
if not existing:
raise HTTPException(status_code=404, detail="Phone number not found")
if request.inbound_workflow_id is not None:
await _ensure_workflow_belongs_to_org(
request.inbound_workflow_id, user.selected_organization_id
)
row = await db_client.update_phone_number(
phone_number_id=phone_number_id,
telephony_configuration_id=config_id,
label=request.label,
inbound_workflow_id=request.inbound_workflow_id,
is_active=request.is_active,
country_code=request.country_code,
extra_metadata=request.extra_metadata,
clear_inbound_workflow=request.clear_inbound_workflow,
)
if not row:
raise HTTPException(status_code=404, detail="Phone number not found")
response = _phone_number_to_response(row)
# Sync the provider application or address with the inbound
# calling webhook address
response.provider_sync = await _sync_inbound_for_phone_number(
config_id, user.selected_organization_id, row.address
)
return response
@router.post(
"/telephony-configs/{config_id}/phone-numbers/{phone_number_id}/set-default-caller",
response_model=PhoneNumberResponse,
)
async def set_default_caller_id(
config_id: int,
phone_number_id: int,
user: UserModel = Depends(get_user),
):
if not user.selected_organization_id:
raise HTTPException(status_code=400, detail="No organization selected")
await _ensure_config_belongs_to_org(config_id, user.selected_organization_id)
row = await db_client.set_default_caller_id(phone_number_id, config_id)
if not row:
raise HTTPException(status_code=404, detail="Phone number not found")
return _phone_number_to_response(row)
@router.delete("/telephony-configs/{config_id}/phone-numbers/{phone_number_id}")
async def delete_phone_number(
config_id: int,
phone_number_id: int,
user: UserModel = Depends(get_user),
):
if not user.selected_organization_id:
raise HTTPException(status_code=400, detail="No organization selected")
await _ensure_config_belongs_to_org(config_id, user.selected_organization_id)
existing = await db_client.get_phone_number_for_config(phone_number_id, config_id)
if not existing:
raise HTTPException(status_code=404, detail="Phone number not found")
deleted = await db_client.delete_phone_number(phone_number_id, config_id)
if not deleted:
raise HTTPException(status_code=404, detail="Phone number not found")
return {"message": "Phone number deleted"}
# ---------------------------------------------------------------------------
# Legacy single-config shim
# ---------------------------------------------------------------------------
@router.get("/telephony-config", response_model=TelephonyConfigurationResponse)
async def get_telephony_configuration(user: UserModel = Depends(get_user)):
"""Legacy: returns the org's default config in the original per-provider
response shape so the existing single-form UI keeps working. Prefer the
multi-config endpoints (``/telephony-configs``) for new clients.
"""
if not user.selected_organization_id:
raise HTTPException(status_code=400, detail="No organization selected")
cfg = await db_client.get_default_telephony_configuration(
user.selected_organization_id
)
if not cfg:
return TelephonyConfigurationResponse()
spec = telephony_registry.get_optional(cfg.provider)
if spec is None:
return TelephonyConfigurationResponse()
addresses = await db_client.list_active_normalized_addresses_for_config(cfg.id)
masked = _mask_sensitive(cfg.provider, cfg.credentials or {})
payload = {**masked, "provider": cfg.provider, "from_numbers": addresses}
response_obj = spec.config_response_cls.model_validate(payload)
return TelephonyConfigurationResponse(**{cfg.provider: response_obj})
@router.post("/telephony-config")
async def save_telephony_configuration(
request: TelephonyConfigRequest,
user: UserModel = Depends(get_user),
):
"""Legacy: upserts the org's default config (and its phone numbers) in the
original payload shape so existing UI clients keep working. Prefer the
multi-config + phone-number endpoints for new clients.
"""
if not user.selected_organization_id:
raise HTTPException(status_code=400, detail="No organization selected")
payload = request.model_dump()
new_addresses = payload.pop("from_numbers", []) or []
payload.pop("provider", None)
default = await db_client.get_default_telephony_configuration(
user.selected_organization_id
)
if default and default.provider == request.provider:
preserve_masked_fields(request.provider, payload, default.credentials or {})
row = await db_client.update_telephony_configuration(
config_id=default.id,
organization_id=user.selected_organization_id,
credentials=payload,
)
else:
row = await db_client.create_telephony_configuration(
organization_id=user.selected_organization_id,
name=f"{request.provider.title()} Default",
provider=request.provider,
credentials=payload,
is_default_outbound=True,
)
# Replace the phone-number set with the inline payload.
existing_numbers = await db_client.list_phone_numbers_for_config(row.id)
existing_by_address = {n.address: n for n in existing_numbers}
incoming_set = set(new_addresses)
for addr in new_addresses:
if addr in existing_by_address:
continue
try:
await db_client.create_phone_number(
organization_id=user.selected_organization_id,
telephony_configuration_id=row.id,
address=addr,
)
except IntegrityError:
logger.warning(
f"Skipping duplicate phone number {addr!r} for config {row.id}"
)
except ValueError as e:
logger.warning(f"Skipping invalid phone number {addr!r}: {e}")
for n in existing_numbers:
if n.address not in incoming_set:
await db_client.delete_phone_number(n.id, row.id)
capture_event(
distinct_id=str(user.provider_id),
event=PostHogEvent.TELEPHONY_CONFIGURED,
properties={
"provider": request.provider,
"phone_number_count": len(new_addresses),
"organization_id": user.selected_organization_id,
},
)
return {"message": "Telephony configuration saved successfully"}
2026-03-23 11:36:39 +05:30
class LangfuseCredentialsRequest(BaseModel):
host: str
public_key: str
secret_key: str
class LangfuseCredentialsResponse(BaseModel):
host: str = ""
public_key: str = ""
secret_key: str = ""
configured: bool = False
@router.get("/langfuse-credentials", response_model=LangfuseCredentialsResponse)
async def get_langfuse_credentials(user: UserModel = Depends(get_user)):
"""Get Langfuse credentials for the user's organization with masked sensitive fields."""
if not user.selected_organization_id:
raise HTTPException(status_code=400, detail="No organization selected")
config = await db_client.get_configuration(
user.selected_organization_id,
OrganizationConfigurationKey.LANGFUSE_CREDENTIALS.value,
)
if not config or not config.value:
return LangfuseCredentialsResponse()
return LangfuseCredentialsResponse(
host=config.value.get("host", ""),
public_key=mask_key(config.value.get("public_key", "")),
secret_key=mask_key(config.value.get("secret_key", "")),
configured=True,
)
@router.post("/langfuse-credentials")
async def save_langfuse_credentials(
request: LangfuseCredentialsRequest,
user: UserModel = Depends(get_user),
):
"""Save Langfuse credentials for the user's organization."""
if not user.selected_organization_id:
raise HTTPException(status_code=400, detail="No organization selected")
existing_config = await db_client.get_configuration(
user.selected_organization_id,
OrganizationConfigurationKey.LANGFUSE_CREDENTIALS.value,
)
config_value = {
"host": request.host,
"public_key": request.public_key,
"secret_key": request.secret_key,
}
# Preserve masked fields
if existing_config and existing_config.value:
if is_mask_of(request.public_key, existing_config.value.get("public_key", "")):
config_value["public_key"] = existing_config.value["public_key"]
if is_mask_of(request.secret_key, existing_config.value.get("secret_key", "")):
config_value["secret_key"] = existing_config.value["secret_key"]
await db_client.upsert_configuration(
user.selected_organization_id,
OrganizationConfigurationKey.LANGFUSE_CREDENTIALS.value,
config_value,
)
# Broadcast to all workers so every process updates its in-memory exporter
await get_worker_sync_manager().broadcast(
WorkerSyncEventType.LANGFUSE_CREDENTIALS,
action="update",
2026-03-23 11:36:39 +05:30
org_id=user.selected_organization_id,
)
return {"message": "Langfuse credentials saved successfully"}
@router.delete("/langfuse-credentials")
async def delete_langfuse_credentials(user: UserModel = Depends(get_user)):
"""Delete Langfuse credentials for the user's organization."""
if not user.selected_organization_id:
raise HTTPException(status_code=400, detail="No organization selected")
deleted = await db_client.delete_configuration(
user.selected_organization_id,
OrganizationConfigurationKey.LANGFUSE_CREDENTIALS.value,
)
if not deleted:
raise HTTPException(status_code=404, detail="No Langfuse credentials found")
# Broadcast to all workers so every process removes its in-memory exporter
await get_worker_sync_manager().broadcast(
WorkerSyncEventType.LANGFUSE_CREDENTIALS,
action="delete",
org_id=user.selected_organization_id,
)
2026-03-23 11:36:39 +05:30
return {"message": "Langfuse credentials deleted successfully"}
class RetryConfigResponse(BaseModel):
enabled: bool
max_retries: int
retry_delay_seconds: int
retry_on_busy: bool
retry_on_no_answer: bool
retry_on_voicemail: bool
class TimeSlotResponse(BaseModel):
day_of_week: int
start_time: str
end_time: str
class ScheduleConfigResponse(BaseModel):
enabled: bool
timezone: str
slots: List[TimeSlotResponse]
class CircuitBreakerConfigResponse(BaseModel):
enabled: bool = False
failure_threshold: float = 0.5
window_seconds: int = 120
min_calls_in_window: int = 5
class LastCampaignSettingsResponse(BaseModel):
retry_config: Optional[RetryConfigResponse] = None
max_concurrency: Optional[int] = None
schedule_config: Optional[ScheduleConfigResponse] = None
circuit_breaker: Optional[CircuitBreakerConfigResponse] = None
class CampaignDefaultsResponse(BaseModel):
concurrent_call_limit: int
from_numbers_count: int
default_retry_config: RetryConfigResponse
last_campaign_settings: Optional[LastCampaignSettingsResponse] = None
@router.get("/campaign-defaults", response_model=CampaignDefaultsResponse)
async def get_campaign_defaults(user: UserModel = Depends(get_user)):
"""Get campaign limits for the user's organization.
Returns the organization's concurrent call limit and default retry configuration.
"""
if not user.selected_organization_id:
raise HTTPException(status_code=400, detail="No organization selected")
# Get concurrent call limit
concurrent_limit = DEFAULT_ORG_CONCURRENCY_LIMIT
try:
config = await db_client.get_configuration(
user.selected_organization_id,
OrganizationConfigurationKey.CONCURRENT_CALL_LIMIT.value,
)
if config and config.value:
concurrent_limit = int(
config.value.get("value", DEFAULT_ORG_CONCURRENCY_LIMIT)
)
except Exception:
pass
# Phone-number count from the org's default telephony config (used by the
# campaign UI to validate max_concurrency against caller-id supply).
from_numbers_count = 0
try:
default_cfg = await db_client.get_default_telephony_configuration(
user.selected_organization_id
)
if default_cfg:
addresses = await db_client.list_active_normalized_addresses_for_config(
default_cfg.id
)
from_numbers_count = len(addresses)
except Exception:
pass
# Get last campaign settings for pre-population
last_campaign_settings = None
try:
last_campaign = await db_client.get_latest_campaign(
user.selected_organization_id
)
if last_campaign:
retry = None
if last_campaign.retry_config:
retry = RetryConfigResponse(**last_campaign.retry_config)
max_conc = None
sched = None
cb = CircuitBreakerConfigResponse()
if last_campaign.orchestrator_metadata:
max_conc = last_campaign.orchestrator_metadata.get("max_concurrency")
sc = last_campaign.orchestrator_metadata.get("schedule_config")
if sc:
sched = ScheduleConfigResponse(
enabled=sc.get("enabled", False),
timezone=sc.get("timezone", "UTC"),
slots=[
TimeSlotResponse(**slot) for slot in sc.get("slots", [])
],
)
cb_data = last_campaign.orchestrator_metadata.get("circuit_breaker")
if cb_data:
cb = CircuitBreakerConfigResponse(**cb_data)
else:
cb = CircuitBreakerConfigResponse()
last_campaign_settings = LastCampaignSettingsResponse(
retry_config=retry,
max_concurrency=max_conc,
schedule_config=sched,
circuit_breaker=cb,
)
except Exception:
pass
return CampaignDefaultsResponse(
concurrent_call_limit=concurrent_limit,
from_numbers_count=from_numbers_count,
default_retry_config=RetryConfigResponse(**DEFAULT_CAMPAIGN_RETRY_CONFIG),
last_campaign_settings=last_campaign_settings,
)