feat: add cloudonix cdr

This commit is contained in:
Abhishek Kumar 2026-01-29 17:45:46 +05:30
parent 91911769b0
commit 07558ec785
6 changed files with 211 additions and 10 deletions

View file

@ -0,0 +1,46 @@
"""add partial index on call_id for workflow run
Revision ID: d1dac4c93e61
Revises: 181475b2a1a1
Create Date: 2026-01-29 16:28:04.105202
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "d1dac4c93e61"
down_revision: Union[str, None] = "181475b2a1a1"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f("ix_api_keys_key_hash"), table_name="api_keys")
op.create_index(op.f("ix_api_keys_key_hash"), "api_keys", ["key_hash"], unique=True)
op.create_index(
"idx_workflow_runs_call_id",
"workflow_runs",
[sa.literal_column("(gathered_context->>'call_id')")],
unique=False,
postgresql_where=sa.text("gathered_context->>'call_id' IS NOT NULL"),
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(
"idx_workflow_runs_call_id",
table_name="workflow_runs",
postgresql_where=sa.text("gathered_context->>'call_id' IS NOT NULL"),
)
op.drop_index(op.f("ix_api_keys_key_hash"), table_name="api_keys")
op.create_index(
op.f("ix_api_keys_key_hash"), "api_keys", ["key_hash"], unique=False
)
# ### end Alembic commands ###

View file

@ -372,6 +372,11 @@ class WorkflowRunModel(Base):
unique=True,
postgresql_where=text("public_access_token IS NOT NULL"),
),
Index(
"idx_workflow_runs_call_id",
text("(gathered_context->>'call_id')"),
postgresql_where=text("gathered_context->>'call_id' IS NOT NULL"),
),
)

View file

@ -4,6 +4,7 @@ from sqlalchemy.future import select
from api.db.base_client import BaseDBClient
from api.db.models import OrganizationConfigurationModel
from api.enums import OrganizationConfigurationKey
class OrganizationConfigurationClient(BaseDBClient):
@ -94,3 +95,29 @@ class OrganizationConfigurationClient(BaseDBClient):
"""Get the value of a configuration, returning default if not found."""
config = await self.get_configuration(organization_id, key)
return config.value if config else default
async def get_organization_id_by_telephony_domain(
self, domain: str
) -> Optional[int]:
"""Find organization ID by domain_id in telephony configuration.
Args:
domain: The telephony domain to search for (e.g., Cloudonix domain_id)
Returns:
Organization ID if found, None otherwise
"""
async with self.async_session() as session:
result = await session.execute(
select(OrganizationConfigurationModel).where(
OrganizationConfigurationModel.key
== OrganizationConfigurationKey.TELEPHONY_CONFIGURATION.value,
)
)
configs = result.scalars().all()
for config in configs:
if config.value and config.value.get("domain_id") == domain:
return config.organization_id
return None

View file

@ -468,3 +468,30 @@ class WorkflowRunClient(BaseDBClient):
)
)
return result.scalars().first()
async def get_workflow_run_by_call_id(
self, call_id: str
) -> Optional[WorkflowRunModel]:
"""Find workflow run by call_id stored in gathered_context.
Args:
call_id: The telephony call ID to search for
Returns:
The WorkflowRunModel if found, None otherwise
"""
async with self.async_session() as session:
# Use JSON text extraction to find matching call_id
# This leverages the idx_workflow_runs_call_id index
result = await session.execute(
select(WorkflowRunModel)
.options(
joinedload(WorkflowRunModel.workflow).joinedload(WorkflowModel.user)
)
.where(
WorkflowRunModel.gathered_context.op("->>")("call_id") == call_id
)
.order_by(WorkflowRunModel.created_at.desc())
.limit(1)
)
return result.scalars().first()

View file

@ -100,6 +100,31 @@ class StatusCallbackRequest(BaseModel):
extra=data,
)
@classmethod
def from_cloudonix_cdr(cls, data: dict):
"""Convert Cloudonix CDR to generic format"""
# Map Cloudonix disposition to common format
disposition_map = {
"ANSWER": "completed",
"BUSY": "busy",
"CANCEL": "canceled",
"FAILED": "failed",
"CONGESTION": "failed",
"NOANSWER": "no-answer",
}
disposition = data.get("disposition", "")
status = disposition_map.get(disposition.upper(), disposition.lower())
return cls(
call_id=data.get("session").get("token"),
status=status,
from_number=data.get("from"),
to_number=data.get("to"),
duration=str(data.get("billsec") or data.get("duration") or 0),
extra=data,
)
@router.post("/initiate-call")
async def initiate_call(
@ -639,16 +664,22 @@ async def handle_twilio_status_callback(
)
# Process the status update
await _process_status_update(workflow_run_id, status_update, workflow_run)
await _process_status_update(workflow_run_id, status_update)
return {"status": "success"}
async def _process_status_update(
workflow_run_id: int, status: StatusCallbackRequest, workflow_run: any
):
async def _process_status_update(workflow_run_id: int, status: StatusCallbackRequest):
"""Process status updates from telephony providers."""
# Fetch fresh workflow_run to ensure we have the latest state
workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
if not workflow_run:
logger.warning(
f"[run {workflow_run_id}] Workflow run not found in status update"
)
return
# Log the status callback
telephony_callback_logs = workflow_run.logs.get("telephony_status_callbacks", [])
telephony_callback_log = {
@ -666,7 +697,14 @@ async def _process_status_update(
logs={"telephony_status_callbacks": telephony_callback_logs},
)
# Handle call completion
# The workflow run state is already marked as completed from either status-update
# callbacks or CDR update callbacks. Lets skip processing.
if workflow_run.state == WorkflowRunState.COMPLETED.value:
return
# Handle call completion - make these updates idempotent - i.e
# they should handle multiple API calls (one due to status update,
# and other due to CDR updates.)
if status.status == "completed":
logger.info(
f"[run {workflow_run_id}] Call completed with duration: {status.duration}s"
@ -792,7 +830,7 @@ async def handle_vonage_events(
)
# Process the status update
await _process_status_update(workflow_run_id, status_update, workflow_run)
await _process_status_update(workflow_run_id, status_update)
# Return 204 No Content as expected by Vonage
return {"status": "ok"}
@ -943,7 +981,7 @@ async def handle_vobiz_hangup_callback(
)
# Process the status update
await _process_status_update(workflow_run_id, status_update, workflow_run)
await _process_status_update(workflow_run_id, status_update)
logger.info(f"[run {workflow_run_id}] Vobiz hangup callback processed successfully")
@ -1111,7 +1149,7 @@ async def handle_cloudonix_status_callback(
)
# Process the status update
await _process_status_update(workflow_run_id, status_update, workflow_run)
await _process_status_update(workflow_run_id, status_update)
return {"status": "success"}
@ -1235,7 +1273,7 @@ async def handle_vobiz_hangup_callback_by_workflow(
extra=parsed_data.get("extra", {}),
)
await _process_status_update(workflow_run_id, status, workflow_run)
await _process_status_update(workflow_run_id, status)
logger.info(
f"[run {workflow_run_id}] Vobiz hangup callback processed successfully"
@ -1389,3 +1427,61 @@ async def handle_inbound_fallback(request: Request):
)
return generic_hangup_response()
@router.post("/cloudonix/cdr")
async def handle_cloudonix_cdr(request: Request):
"""Handle Cloudonix CDR (Call Detail Record) webhooks.
Cloudonix sends CDR records when calls complete. The CDR contains:
- domain: Used to identify the organization
- call_id: Used to find the workflow run
- disposition: Call termination status (ANSWER, BUSY, CANCEL, FAILED, CONGESTION, NOANSWER)
- duration/billsec: Call duration information
"""
try:
cdr_data = await request.json()
except Exception as e:
logger.error(f"Failed to parse Cloudonix CDR JSON: {e}")
return {"status": "error", "message": "Invalid JSON payload"}
# Extract domain to find organization
domain = cdr_data.get("domain")
if not domain:
logger.warning("Cloudonix CDR missing domain field")
return {"status": "error", "message": "Missing domain field"}
# Extract call_id to find workflow run
call_id = cdr_data.get("session").get("token")
if not call_id:
logger.warning("Cloudonix CDR missing call_id field")
return {"status": "error", "message": "Missing call_id field"}
# Find organization by domain_id in telephony configuration
organization_id = await db_client.get_organization_id_by_telephony_domain(domain)
if not organization_id:
logger.warning(f"No organization found with Cloudonix domain: {domain}")
return {"status": "error", "message": "Organization not found for domain"}
# Find workflow run by call_id in gathered_context
workflow_run = await db_client.get_workflow_run_by_call_id(call_id)
if not workflow_run:
logger.warning(f"No workflow run found for Cloudonix call_id: {call_id}")
return {"status": "ignored", "reason": "workflow_run_not_found"}
workflow_run_id = workflow_run.id
set_current_run_id(workflow_run_id)
logger.info(f"[run {workflow_run_id}] Processing Cloudonix CDR for call {call_id}")
# Convert CDR to status update using StatusCallbackRequest
status_update = StatusCallbackRequest.from_cloudonix_cdr(cdr_data)
# Process the status update
await _process_status_update(workflow_run_id, status_update)
logger.info(
f"[run {workflow_run_id}] Cloudonix CDR processed successfully - "
f"disposition: {cdr_data.get('disposition')}, status: {status_update.status}"
)
return {"status": "success"}

View file

@ -185,7 +185,7 @@ class CloudonixProvider(TelephonyProvider):
call_id=session_token,
status="initiated",
provider_metadata={
"session_token": session_token,
"call_id": session_token,
"domain_id": domain_id,
"subscriber_id": subscriber_id,
},