diff --git a/api/services/telephony/providers/vobiz/routes.py b/api/services/telephony/providers/vobiz/routes.py index 15c2def..6e8e131 100644 --- a/api/services/telephony/providers/vobiz/routes.py +++ b/api/services/telephony/providers/vobiz/routes.py @@ -6,9 +6,8 @@ provider registry — see ProviderSpec.router. import json from datetime import UTC, datetime -from typing import Optional -from fastapi import APIRouter, Header, Request +from fastapi import APIRouter, HTTPException, Request from loguru import logger from pipecat.utils.run_context import set_current_run_id from starlette.responses import HTMLResponse @@ -29,6 +28,30 @@ from api.utils.telephony_helper import ( router = APIRouter() +async def _verify_vobiz_callback( + provider, + webhook_url: str, + callback_data: dict, + headers: dict, + raw_body: str, + *, + log_prefix: str, +) -> None: + """Verify a Vobiz callback signature, failing closed. + + Vobiz signs every callback, so a missing signature header is an invalid + request — ``provider.verify_inbound_signature`` returns ``False`` for both + missing and forged signatures. Reject with HTTP 403 (per Vobiz's + callback-validation docs) so the caller never reaches status processing. + """ + is_valid = await provider.verify_inbound_signature( + webhook_url, callback_data, headers, raw_body + ) + if not is_valid: + logger.warning(f"{log_prefix} Invalid or missing Vobiz callback signature") + raise HTTPException(status_code=403, detail="Invalid webhook signature") + + @router.post("/vobiz-xml", include_in_schema=False) async def handle_vobiz_xml_webhook( workflow_id: int, user_id: int, workflow_run_id: int, organization_id: int @@ -65,8 +88,6 @@ async def handle_vobiz_xml_webhook( async def handle_vobiz_hangup_callback( workflow_run_id: int, request: Request, - x_vobiz_signature: Optional[str] = Header(None), - x_vobiz_timestamp: Optional[str] = Header(None), ): """Handle Vobiz hangup callback (sent when call ends). @@ -75,82 +96,23 @@ async def handle_vobiz_hangup_callback( """ set_current_run_id(workflow_run_id) - # Logging all headers and body to understand what Vobiz actually sends all_headers = dict(request.headers) - logger.info( - f"[run {workflow_run_id}] Vobiz hangup callback - Headers: {json.dumps(all_headers)}" - ) # Parse the callback data from the raw body so signed webhooks can verify # the exact bytes Vobiz sent without draining the request stream first. callback_data, raw_body = await parse_webhook_request(request) - # TODO: Remove this debug logging after Vobiz team clarifies webhook authentication - logger.info( - f"[run {workflow_run_id}] Vobiz hangup callback - Body: {json.dumps(callback_data)}" - ) logger.info( f"[run {workflow_run_id}] Received Vobiz hangup callback {json.dumps(callback_data)}" ) - # Verify signature if Vobiz provided any supported signature header. - has_vobiz_signature = any( - header in all_headers - for header in ( - "x-vobiz-signature-v3", - "x-vobiz-signature-ma-v3", - "x-vobiz-signature-v2", - "x-vobiz-signature-ma-v2", - ) - ) - if has_vobiz_signature: - # We need the workflow run to get organization for provider credentials - workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id) - if not workflow_run: - logger.warning( - f"[run {workflow_run_id}] Workflow run not found for signature verification" - ) - return {"status": "error", "reason": "workflow_run_not_found"} - - workflow = await db_client.get_workflow_by_id(workflow_run.workflow_id) - if not workflow: - logger.warning( - f"[run {workflow_run_id}] Workflow not found for signature verification" - ) - return {"status": "error", "reason": "workflow_not_found"} - - provider = await get_telephony_provider_for_run( - workflow_run, workflow.organization_id - ) - - # Verify signature - backend_endpoint, _ = await get_backend_endpoints() - webhook_url = f"{backend_endpoint}/api/v1/telephony/vobiz/hangup-callback/{workflow_run_id}" - - is_valid = await provider.verify_inbound_signature( - webhook_url, - callback_data, - all_headers, - raw_body, - ) - - if not is_valid: - logger.warning( - f"[run {workflow_run_id}] Invalid Vobiz hangup callback signature" - ) - return {"status": "error", "reason": "invalid_signature"} - - logger.info(f"[run {workflow_run_id}] Vobiz hangup callback signature verified") - else: - # Get workflow run for processing (signature verification already got it if needed) - workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id) + workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id) if not workflow_run: logger.warning( f"[run {workflow_run_id}] Workflow run not found for Vobiz hangup callback" ) return {"status": "ignored", "reason": "workflow_run_not_found"} - # Get workflow and provider workflow = await db_client.get_workflow_by_id(workflow_run.workflow_id) if not workflow: logger.warning(f"[run {workflow_run_id}] Workflow not found") @@ -160,6 +122,21 @@ async def handle_vobiz_hangup_callback( workflow_run, workflow.organization_id ) + # Fail closed: Vobiz signs every callback, so reject unsigned/forged ones + # before they can mutate call state. + backend_endpoint, _ = await get_backend_endpoints() + webhook_url = ( + f"{backend_endpoint}/api/v1/telephony/vobiz/hangup-callback/{workflow_run_id}" + ) + await _verify_vobiz_callback( + provider, + webhook_url, + callback_data, + all_headers, + raw_body, + log_prefix=f"[run {workflow_run_id}]", + ) + logger.debug( f"[run {workflow_run_id}] Processing Vobiz hangup with provider: {provider.PROVIDER_NAME}" ) @@ -167,10 +144,6 @@ async def handle_vobiz_hangup_callback( # Parse the callback data into generic format parsed_data = provider.parse_status_callback(callback_data) - logger.debug( - f"[run {workflow_run_id}] Parsed Vobiz callback data: {json.dumps(parsed_data)}" - ) - # Create StatusCallbackRequest from parsed data status_update = StatusCallbackRequest( call_id=parsed_data["call_id"], @@ -194,8 +167,6 @@ async def handle_vobiz_hangup_callback( async def handle_vobiz_ring_callback( workflow_run_id: int, request: Request, - x_vobiz_signature: Optional[str] = Header(None), - x_vobiz_timestamp: Optional[str] = Header(None), ): """Handle Vobiz ring callback (sent when call starts ringing). @@ -204,84 +175,46 @@ async def handle_vobiz_ring_callback( """ set_current_run_id(workflow_run_id) - # Logging all headers and body to understand what Vobiz actually sends all_headers = dict(request.headers) - logger.info( - f"[run {workflow_run_id}] Vobiz ring callback - Headers: {json.dumps(all_headers)}" - ) # Parse the callback data from the raw body so signed webhooks can verify # the exact bytes Vobiz sent without draining the request stream first. callback_data, raw_body = await parse_webhook_request(request) - # TODO: Remove this debug logging after Vobiz team clarifies webhook authentication - logger.info( - f"[run {workflow_run_id}] Vobiz ring callback - Body: {json.dumps(callback_data)}" - ) - logger.info( f"[run {workflow_run_id}] Received Vobiz ring callback {json.dumps(callback_data)}" ) - # Verify signature if Vobiz provided any supported signature header. - has_vobiz_signature = any( - header in all_headers - for header in ( - "x-vobiz-signature-v3", - "x-vobiz-signature-ma-v3", - "x-vobiz-signature-v2", - "x-vobiz-signature-ma-v2", - ) - ) - if has_vobiz_signature: - # We need the workflow run to get organization for provider credentials - workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id) - if not workflow_run: - logger.warning( - f"[run {workflow_run_id}] Workflow run not found for signature verification" - ) - return {"status": "error", "reason": "workflow_run_not_found"} - - workflow = await db_client.get_workflow_by_id(workflow_run.workflow_id) - if not workflow: - logger.warning( - f"[run {workflow_run_id}] Workflow not found for signature verification" - ) - return {"status": "error", "reason": "workflow_not_found"} - - provider = await get_telephony_provider_for_run( - workflow_run, workflow.organization_id - ) - - # Verify signature - backend_endpoint, _ = await get_backend_endpoints() - webhook_url = ( - f"{backend_endpoint}/api/v1/telephony/vobiz/ring-callback/{workflow_run_id}" - ) - - is_valid = await provider.verify_inbound_signature( - webhook_url, - callback_data, - all_headers, - raw_body, - ) - - if not is_valid: - logger.warning( - f"[run {workflow_run_id}] Invalid Vobiz ring callback signature" - ) - return {"status": "error", "reason": "invalid_signature"} - - logger.info(f"[run {workflow_run_id}] Vobiz ring callback signature verified") - else: - # Get workflow run for processing (signature verification already got it if needed) - workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id) + workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id) if not workflow_run: logger.warning( f"[run {workflow_run_id}] Workflow run not found for Vobiz ring callback" ) return {"status": "ignored", "reason": "workflow_run_not_found"} + workflow = await db_client.get_workflow_by_id(workflow_run.workflow_id) + if not workflow: + logger.warning(f"[run {workflow_run_id}] Workflow not found") + return {"status": "ignored", "reason": "workflow_not_found"} + + provider = await get_telephony_provider_for_run( + workflow_run, workflow.organization_id + ) + + # Fail closed: reject unsigned/forged ring callbacks before logging them. + backend_endpoint, _ = await get_backend_endpoints() + webhook_url = ( + f"{backend_endpoint}/api/v1/telephony/vobiz/ring-callback/{workflow_run_id}" + ) + await _verify_vobiz_callback( + provider, + webhook_url, + callback_data, + all_headers, + raw_body, + log_prefix=f"[run {workflow_run_id}]", + ) + # Log the ringing event telephony_callback_logs = workflow_run.logs.get("telephony_status_callbacks", []) ring_log = { @@ -308,15 +241,10 @@ async def handle_vobiz_ring_callback( async def handle_vobiz_hangup_callback_by_workflow( workflow_id: int, request: Request, - x_vobiz_signature: Optional[str] = Header(None), - x_vobiz_timestamp: Optional[str] = Header(None), ): """Handle Vobiz hangup callback with workflow_id - finds workflow run by call_id.""" all_headers = dict(request.headers) - logger.info( - f"[workflow {workflow_id}] Vobiz hangup callback - Headers: {json.dumps(all_headers)}" - ) try: callback_data, raw_body = await parse_webhook_request(request) @@ -364,35 +292,18 @@ async def handle_vobiz_hangup_callback_by_workflow( workflow_run, workflow.organization_id ) - has_vobiz_signature = any( - header in all_headers - for header in ( - "x-vobiz-signature-v3", - "x-vobiz-signature-ma-v3", - "x-vobiz-signature-v2", - "x-vobiz-signature-ma-v2", - ) + # Fail closed: Vobiz signs every callback, so reject unsigned/forged ones + # before they can mutate call state. + backend_endpoint, _ = await get_backend_endpoints() + webhook_url = f"{backend_endpoint}/api/v1/telephony/vobiz/hangup-callback/workflow/{workflow_id}" + await _verify_vobiz_callback( + provider, + webhook_url, + callback_data, + all_headers, + raw_body, + log_prefix=f"[workflow {workflow_id}]", ) - if has_vobiz_signature: - backend_endpoint, _ = await get_backend_endpoints() - webhook_url = f"{backend_endpoint}/api/v1/telephony/vobiz/hangup-callback/workflow/{workflow_id}" - - is_valid = await provider.verify_inbound_signature( - webhook_url, - callback_data, - all_headers, - raw_body, - ) - - if not is_valid: - logger.warning( - f"[workflow {workflow_id}] Invalid Vobiz hangup callback signature" - ) - return {"status": "error", "message": "invalid_signature"} - - logger.info( - f"[workflow {workflow_id}] Vobiz hangup callback signature verified" - ) try: parsed_data = provider.parse_status_callback(callback_data) diff --git a/api/tests/telephony/vobiz/test_routes.py b/api/tests/telephony/vobiz/test_routes.py index 7b80d46..cfccfc9 100644 --- a/api/tests/telephony/vobiz/test_routes.py +++ b/api/tests/telephony/vobiz/test_routes.py @@ -6,11 +6,13 @@ from unittest.mock import AsyncMock, patch from urllib.parse import urlencode import pytest +from fastapi import HTTPException from starlette.requests import Request from api.services.telephony.providers.vobiz.provider import VobizProvider from api.services.telephony.providers.vobiz.routes import ( handle_vobiz_hangup_callback, + handle_vobiz_hangup_callback_by_workflow, handle_vobiz_ring_callback, ) @@ -225,3 +227,154 @@ async def test_vobiz_verify_inbound_signature_rejects_missing_signature(): {}, {}, ) + + +@pytest.mark.asyncio +async def test_vobiz_hangup_callback_rejects_missing_signature(): + """An unsigned hangup callback must be rejected before status processing.""" + provider = _provider() + form_data = { + "CallUUID": "call-123", + "CallStatus": "completed", + "From": "15551230001", + "To": "15551230002", + "Direction": "outbound", + "Duration": "12", + } + # No x-vobiz-signature-* headers — the callback is unsigned. + request = _request( + path="/api/v1/telephony/vobiz/hangup-callback/123", + form_data=form_data, + ) + + with ( + patch("api.services.telephony.providers.vobiz.routes.db_client") as db_client, + patch( + "api.services.telephony.providers.vobiz.routes.get_telephony_provider_for_run", + new_callable=AsyncMock, + return_value=provider, + ), + patch( + "api.services.telephony.providers.vobiz.routes.get_backend_endpoints", + new_callable=AsyncMock, + return_value=("https://example.test", "wss://example.test"), + ), + patch( + "api.services.telephony.providers.vobiz.routes._process_status_update", + new_callable=AsyncMock, + ) as process_status, + ): + db_client.get_workflow_run_by_id = AsyncMock( + return_value=SimpleNamespace(workflow_id=7) + ) + db_client.get_workflow_by_id = AsyncMock( + return_value=SimpleNamespace(organization_id=11) + ) + + with pytest.raises(HTTPException) as exc_info: + await handle_vobiz_hangup_callback( + workflow_run_id=123, + request=request, + ) + + assert exc_info.value.status_code == 403 + process_status.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_vobiz_ring_callback_rejects_missing_signature(): + """An unsigned ring callback must be rejected before it is logged.""" + provider = _provider() + form_data = { + "CallUUID": "call-123", + "CallStatus": "ringing", + "From": "15551230001", + "To": "15551230002", + } + # No x-vobiz-signature-* headers — the callback is unsigned. + request = _request( + path="/api/v1/telephony/vobiz/ring-callback/123", + form_data=form_data, + ) + + workflow_run = SimpleNamespace(workflow_id=7, logs={}) + + with ( + patch("api.services.telephony.providers.vobiz.routes.db_client") as db_client, + patch( + "api.services.telephony.providers.vobiz.routes.get_telephony_provider_for_run", + new_callable=AsyncMock, + return_value=provider, + ), + patch( + "api.services.telephony.providers.vobiz.routes.get_backend_endpoints", + new_callable=AsyncMock, + return_value=("https://example.test", "wss://example.test"), + ), + ): + db_client.get_workflow_run_by_id = AsyncMock(return_value=workflow_run) + db_client.get_workflow_by_id = AsyncMock( + return_value=SimpleNamespace(organization_id=11) + ) + db_client.update_workflow_run = AsyncMock() + + with pytest.raises(HTTPException) as exc_info: + await handle_vobiz_ring_callback( + workflow_run_id=123, + request=request, + ) + + assert exc_info.value.status_code == 403 + db_client.update_workflow_run.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_vobiz_hangup_callback_by_workflow_rejects_missing_signature(): + """An unsigned by-workflow hangup callback must be rejected before processing.""" + provider = _provider() + form_data = { + "CallUUID": "call-123", + "CallStatus": "completed", + "From": "15551230001", + "To": "15551230002", + "Direction": "outbound", + "Duration": "12", + } + # No x-vobiz-signature-* headers — the callback is unsigned. + request = _request( + path="/api/v1/telephony/vobiz/hangup-callback/workflow/7", + form_data=form_data, + ) + + with ( + patch("api.services.telephony.providers.vobiz.routes.db_client") as db_client, + patch( + "api.services.telephony.providers.vobiz.routes.get_telephony_provider_for_run", + new_callable=AsyncMock, + return_value=provider, + ), + patch( + "api.services.telephony.providers.vobiz.routes.get_backend_endpoints", + new_callable=AsyncMock, + return_value=("https://example.test", "wss://example.test"), + ), + patch( + "api.services.telephony.providers.vobiz.routes._process_status_update", + new_callable=AsyncMock, + ) as process_status, + ): + db_client.get_workflow_by_id = AsyncMock( + return_value=SimpleNamespace(organization_id=11) + ) + db_client.get_workflow_run_by_call_id = AsyncMock( + return_value=SimpleNamespace(id=123, workflow_id=7) + ) + + with pytest.raises(HTTPException) as exc_info: + await handle_vobiz_hangup_callback_by_workflow( + workflow_id=7, + request=request, + ) + + assert exc_info.value.status_code == 403 + process_status.assert_not_awaited()