diff --git a/api/alembic/versions/d1dac4c93e61_add_partial_index_on_call_id_for_.py b/api/alembic/versions/d1dac4c93e61_add_partial_index_on_call_id_for_.py new file mode 100644 index 0000000..a6be1bf --- /dev/null +++ b/api/alembic/versions/d1dac4c93e61_add_partial_index_on_call_id_for_.py @@ -0,0 +1,46 @@ +"""add partial index on call_id for workflow run + +Revision ID: d1dac4c93e61 +Revises: 181475b2a1a1 +Create Date: 2026-01-29 16:28:04.105202 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "d1dac4c93e61" +down_revision: Union[str, None] = "181475b2a1a1" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index(op.f("ix_api_keys_key_hash"), table_name="api_keys") + op.create_index(op.f("ix_api_keys_key_hash"), "api_keys", ["key_hash"], unique=True) + op.create_index( + "idx_workflow_runs_call_id", + "workflow_runs", + [sa.literal_column("(gathered_context->>'call_id')")], + unique=False, + postgresql_where=sa.text("gathered_context->>'call_id' IS NOT NULL"), + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index( + "idx_workflow_runs_call_id", + table_name="workflow_runs", + postgresql_where=sa.text("gathered_context->>'call_id' IS NOT NULL"), + ) + op.drop_index(op.f("ix_api_keys_key_hash"), table_name="api_keys") + op.create_index( + op.f("ix_api_keys_key_hash"), "api_keys", ["key_hash"], unique=False + ) + # ### end Alembic commands ### diff --git a/api/db/models.py b/api/db/models.py index 3ba0ae7..3cafdcd 100644 --- a/api/db/models.py +++ b/api/db/models.py @@ -372,6 +372,11 @@ class WorkflowRunModel(Base): unique=True, postgresql_where=text("public_access_token IS NOT NULL"), ), + Index( + "idx_workflow_runs_call_id", + text("(gathered_context->>'call_id')"), + postgresql_where=text("gathered_context->>'call_id' IS NOT NULL"), + ), ) diff --git a/api/db/organization_configuration_client.py b/api/db/organization_configuration_client.py index a169b89..521b3f1 100644 --- a/api/db/organization_configuration_client.py +++ b/api/db/organization_configuration_client.py @@ -4,6 +4,7 @@ from sqlalchemy.future import select from api.db.base_client import BaseDBClient from api.db.models import OrganizationConfigurationModel +from api.enums import OrganizationConfigurationKey class OrganizationConfigurationClient(BaseDBClient): @@ -94,3 +95,29 @@ class OrganizationConfigurationClient(BaseDBClient): """Get the value of a configuration, returning default if not found.""" config = await self.get_configuration(organization_id, key) return config.value if config else default + + async def get_organization_id_by_telephony_domain( + self, domain: str + ) -> Optional[int]: + """Find organization ID by domain_id in telephony configuration. + + Args: + domain: The telephony domain to search for (e.g., Cloudonix domain_id) + + Returns: + Organization ID if found, None otherwise + """ + async with self.async_session() as session: + result = await session.execute( + select(OrganizationConfigurationModel).where( + OrganizationConfigurationModel.key + == OrganizationConfigurationKey.TELEPHONY_CONFIGURATION.value, + ) + ) + configs = result.scalars().all() + + for config in configs: + if config.value and config.value.get("domain_id") == domain: + return config.organization_id + + return None diff --git a/api/db/workflow_run_client.py b/api/db/workflow_run_client.py index 604a431..3ec1a18 100644 --- a/api/db/workflow_run_client.py +++ b/api/db/workflow_run_client.py @@ -468,3 +468,30 @@ class WorkflowRunClient(BaseDBClient): ) ) return result.scalars().first() + + async def get_workflow_run_by_call_id( + self, call_id: str + ) -> Optional[WorkflowRunModel]: + """Find workflow run by call_id stored in gathered_context. + + Args: + call_id: The telephony call ID to search for + + Returns: + The WorkflowRunModel if found, None otherwise + """ + async with self.async_session() as session: + # Use JSON text extraction to find matching call_id + # This leverages the idx_workflow_runs_call_id index + result = await session.execute( + select(WorkflowRunModel) + .options( + joinedload(WorkflowRunModel.workflow).joinedload(WorkflowModel.user) + ) + .where( + WorkflowRunModel.gathered_context.op("->>")("call_id") == call_id + ) + .order_by(WorkflowRunModel.created_at.desc()) + .limit(1) + ) + return result.scalars().first() diff --git a/api/routes/telephony.py b/api/routes/telephony.py index 70aada7..b0f4a42 100644 --- a/api/routes/telephony.py +++ b/api/routes/telephony.py @@ -100,6 +100,31 @@ class StatusCallbackRequest(BaseModel): extra=data, ) + @classmethod + def from_cloudonix_cdr(cls, data: dict): + """Convert Cloudonix CDR to generic format""" + # Map Cloudonix disposition to common format + disposition_map = { + "ANSWER": "completed", + "BUSY": "busy", + "CANCEL": "canceled", + "FAILED": "failed", + "CONGESTION": "failed", + "NOANSWER": "no-answer", + } + + disposition = data.get("disposition", "") + status = disposition_map.get(disposition.upper(), disposition.lower()) + + return cls( + call_id=data.get("session").get("token"), + status=status, + from_number=data.get("from"), + to_number=data.get("to"), + duration=str(data.get("billsec") or data.get("duration") or 0), + extra=data, + ) + @router.post("/initiate-call") async def initiate_call( @@ -639,16 +664,22 @@ async def handle_twilio_status_callback( ) # Process the status update - await _process_status_update(workflow_run_id, status_update, workflow_run) + await _process_status_update(workflow_run_id, status_update) return {"status": "success"} -async def _process_status_update( - workflow_run_id: int, status: StatusCallbackRequest, workflow_run: any -): +async def _process_status_update(workflow_run_id: int, status: StatusCallbackRequest): """Process status updates from telephony providers.""" + # Fetch fresh workflow_run to ensure we have the latest state + workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id) + if not workflow_run: + logger.warning( + f"[run {workflow_run_id}] Workflow run not found in status update" + ) + return + # Log the status callback telephony_callback_logs = workflow_run.logs.get("telephony_status_callbacks", []) telephony_callback_log = { @@ -666,7 +697,14 @@ async def _process_status_update( logs={"telephony_status_callbacks": telephony_callback_logs}, ) - # Handle call completion + # The workflow run state is already marked as completed from either status-update + # callbacks or CDR update callbacks. Lets skip processing. + if workflow_run.state == WorkflowRunState.COMPLETED.value: + return + + # Handle call completion - make these updates idempotent - i.e + # they should handle multiple API calls (one due to status update, + # and other due to CDR updates.) if status.status == "completed": logger.info( f"[run {workflow_run_id}] Call completed with duration: {status.duration}s" @@ -792,7 +830,7 @@ async def handle_vonage_events( ) # Process the status update - await _process_status_update(workflow_run_id, status_update, workflow_run) + await _process_status_update(workflow_run_id, status_update) # Return 204 No Content as expected by Vonage return {"status": "ok"} @@ -943,7 +981,7 @@ async def handle_vobiz_hangup_callback( ) # Process the status update - await _process_status_update(workflow_run_id, status_update, workflow_run) + await _process_status_update(workflow_run_id, status_update) logger.info(f"[run {workflow_run_id}] Vobiz hangup callback processed successfully") @@ -1111,7 +1149,7 @@ async def handle_cloudonix_status_callback( ) # Process the status update - await _process_status_update(workflow_run_id, status_update, workflow_run) + await _process_status_update(workflow_run_id, status_update) return {"status": "success"} @@ -1235,7 +1273,7 @@ async def handle_vobiz_hangup_callback_by_workflow( extra=parsed_data.get("extra", {}), ) - await _process_status_update(workflow_run_id, status, workflow_run) + await _process_status_update(workflow_run_id, status) logger.info( f"[run {workflow_run_id}] Vobiz hangup callback processed successfully" @@ -1389,3 +1427,61 @@ async def handle_inbound_fallback(request: Request): ) return generic_hangup_response() + + +@router.post("/cloudonix/cdr") +async def handle_cloudonix_cdr(request: Request): + """Handle Cloudonix CDR (Call Detail Record) webhooks. + + Cloudonix sends CDR records when calls complete. The CDR contains: + - domain: Used to identify the organization + - call_id: Used to find the workflow run + - disposition: Call termination status (ANSWER, BUSY, CANCEL, FAILED, CONGESTION, NOANSWER) + - duration/billsec: Call duration information + """ + try: + cdr_data = await request.json() + except Exception as e: + logger.error(f"Failed to parse Cloudonix CDR JSON: {e}") + return {"status": "error", "message": "Invalid JSON payload"} + + # Extract domain to find organization + domain = cdr_data.get("domain") + if not domain: + logger.warning("Cloudonix CDR missing domain field") + return {"status": "error", "message": "Missing domain field"} + + # Extract call_id to find workflow run + call_id = cdr_data.get("session").get("token") + if not call_id: + logger.warning("Cloudonix CDR missing call_id field") + return {"status": "error", "message": "Missing call_id field"} + + # Find organization by domain_id in telephony configuration + organization_id = await db_client.get_organization_id_by_telephony_domain(domain) + if not organization_id: + logger.warning(f"No organization found with Cloudonix domain: {domain}") + return {"status": "error", "message": "Organization not found for domain"} + + # Find workflow run by call_id in gathered_context + workflow_run = await db_client.get_workflow_run_by_call_id(call_id) + if not workflow_run: + logger.warning(f"No workflow run found for Cloudonix call_id: {call_id}") + return {"status": "ignored", "reason": "workflow_run_not_found"} + + workflow_run_id = workflow_run.id + set_current_run_id(workflow_run_id) + logger.info(f"[run {workflow_run_id}] Processing Cloudonix CDR for call {call_id}") + + # Convert CDR to status update using StatusCallbackRequest + status_update = StatusCallbackRequest.from_cloudonix_cdr(cdr_data) + + # Process the status update + await _process_status_update(workflow_run_id, status_update) + + logger.info( + f"[run {workflow_run_id}] Cloudonix CDR processed successfully - " + f"disposition: {cdr_data.get('disposition')}, status: {status_update.status}" + ) + + return {"status": "success"} diff --git a/api/services/telephony/providers/cloudonix_provider.py b/api/services/telephony/providers/cloudonix_provider.py index 7126302..3678617 100644 --- a/api/services/telephony/providers/cloudonix_provider.py +++ b/api/services/telephony/providers/cloudonix_provider.py @@ -185,7 +185,7 @@ class CloudonixProvider(TelephonyProvider): call_id=session_token, status="initiated", provider_metadata={ - "session_token": session_token, + "call_id": session_token, "domain_id": domain_id, "subscriber_id": subscriber_id, },