"""Cloudonix telephony routes (webhooks, status callbacks, answer URLs). Mounted under ``/api/v1/telephony`` by ``api.routes.telephony`` via the provider registry — see ProviderSpec.router. """ import json from fastapi import APIRouter, Request from loguru import logger from pipecat.utils.run_context import set_current_run_id from api.db import db_client from api.services.telephony.factory import get_telephony_provider_for_run from api.services.telephony.status_processor import ( StatusCallbackRequest, _process_status_update, ) router = APIRouter() @router.post("/cloudonix/status-callback/{workflow_run_id}") async def handle_cloudonix_status_callback( workflow_run_id: int, request: Request, ): """Handle Cloudonix-specific status callbacks. Cloudonix sends call status updates to the callback URL specified during call initiation. """ set_current_run_id(workflow_run_id) # Parse callback data - determine if JSON or form data content_type = request.headers.get("content-type", "") if "application/json" in content_type: callback_data = await request.json() else: # Assume form data (like Twilio) form_data = await request.form() callback_data = dict(form_data) logger.info( f"[run {workflow_run_id}] Received Cloudonix status callback: {json.dumps(callback_data)}" ) # Get workflow run to find organization workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id) if not workflow_run: logger.warning(f"Workflow run {workflow_run_id} not found for status callback") return {"status": "ignored", "reason": "workflow_run_not_found"} # Get workflow and provider workflow = await db_client.get_workflow_by_id(workflow_run.workflow_id) if not workflow: logger.warning(f"Workflow {workflow_run.workflow_id} not found") return {"status": "ignored", "reason": "workflow_not_found"} provider = await get_telephony_provider_for_run( workflow_run, workflow.organization_id ) # Parse the callback data into generic format parsed_data = provider.parse_status_callback(callback_data) # Create StatusCallbackRequest from parsed data status_update = StatusCallbackRequest( call_id=parsed_data["call_id"], status=parsed_data["status"], from_number=parsed_data.get("from_number"), to_number=parsed_data.get("to_number"), direction=parsed_data.get("direction"), duration=parsed_data.get("duration"), extra=parsed_data.get("extra", {}), ) # Process the status update await _process_status_update(workflow_run_id, status_update) return {"status": "success"} @router.post("/cloudonix/cdr") async def handle_cloudonix_cdr(request: Request): """Handle Cloudonix CDR (Call Detail Record) webhooks. Cloudonix sends CDR records when calls complete. The CDR contains: - domain: Used to identify the organization - call_id: Used to find the workflow run - disposition: Call termination status (ANSWER, BUSY, CANCEL, FAILED, CONGESTION, NOANSWER) - duration/billsec: Call duration information """ try: cdr_data = await request.json() except Exception as e: logger.error(f"Failed to parse Cloudonix CDR JSON: {e}") return {"status": "error", "message": "Invalid JSON payload"} # Extract domain to find organization domain = cdr_data.get("domain") if not domain: logger.warning("Cloudonix CDR missing domain field") return {"status": "error", "message": "Missing domain field"} # Extract call_id to find workflow run call_id = cdr_data.get("session").get("token") logger.info(f"Cloudonix CDR data for call id {call_id} - {cdr_data}") if not call_id: logger.warning("Cloudonix CDR missing call_id field") return {"status": "error", "message": "Missing call_id field"} # Find workflow run by call_id in gathered_context workflow_run = await db_client.get_workflow_run_by_call_id(call_id) if not workflow_run: logger.warning(f"No workflow run found for Cloudonix call_id: {call_id}") return {"status": "ignored", "reason": "workflow_run_not_found"} workflow_run_id = workflow_run.id set_current_run_id(workflow_run_id) logger.info(f"[run {workflow_run_id}] Processing Cloudonix CDR for call {call_id}") # Convert CDR to status update using StatusCallbackRequest status_update = StatusCallbackRequest.from_cloudonix_cdr(cdr_data) # Process the status update await _process_status_update(workflow_run_id, status_update) logger.info( f"[run {workflow_run_id}] Cloudonix CDR processed successfully - " f"disposition: {cdr_data.get('disposition')}, status: {status_update.status}" ) return {"status": "success"}