Merge remote-tracking branch 'dograh-main' into fix/378-disable-duplicate-trigger-nodes

This commit is contained in:
Varun Nuthalapati 2026-06-08 09:45:10 -07:00
commit af5818d63a
26 changed files with 1022 additions and 311 deletions

View file

@ -1,3 +1,3 @@
{
".": "1.33.0"
".": "1.34.0"
}

View file

@ -1,5 +1,33 @@
# Changelog
## 1.34.0 (2026-06-03)
<!-- Release notes generated using configuration in .github/release.yml at main -->
## What's Changed
### Features
* feat: add mcp guides for various topic and stages for bot building by @a6kme in https://github.com/dograh-hq/dograh/pull/380
* feat: allow overriding base URL of OpenAI STT and TTS by @developer603 in https://github.com/dograh-hq/dograh/pull/377
* feat: add Azure AI multi-provider support (TTS, STT, Embeddings, Realtime) by @vishaldhateria in https://github.com/dograh-hq/dograh/pull/381
### Bug Fixes
* fix: support object and array parameters in custom HTTP tools by @mvanhorn in https://github.com/dograh-hq/dograh/pull/373
* fix(telephony): resolve transfer context via call-sid index instead of KEYS scan by @shiminshen in https://github.com/dograh-hq/dograh/pull/387
* fix(webrtc): enforce embed allowed-domain policy on public signaling websocket by @shiminshen in https://github.com/dograh-hq/dograh/pull/388
* fix: use runtime BACKEND_URL for proxying by @a6kme in https://github.com/dograh-hq/dograh/pull/411
* fix: add CORS preflight handler and ACAO header for embed config endpoint by @nuthalapativarun in https://github.com/dograh-hq/dograh/pull/403
### Other Changes
* Add Sarvam LLM, update Sarvam STT models, expose usage_info on run detail by @abhaybabbar in https://github.com/dograh-hq/dograh/pull/351
* fix: make email lookup case-insensitive in get_user_by_email by @developer603 in https://github.com/dograh-hq/dograh/pull/397
## New Contributors
* @abhaybabbar made their first contribution in https://github.com/dograh-hq/dograh/pull/351
* @mvanhorn made their first contribution in https://github.com/dograh-hq/dograh/pull/373
* @developer603 made their first contribution in https://github.com/dograh-hq/dograh/pull/377
* @vishaldhateria made their first contribution in https://github.com/dograh-hq/dograh/pull/381
* @shiminshen made their first contribution in https://github.com/dograh-hq/dograh/pull/387
**Full Changelog**: https://github.com/dograh-hq/dograh/compare/dograh-v1.33.0...dograh-v1.34.0
## [1.33.0](https://github.com/dograh-hq/dograh/compare/dograh-v1.32.0...dograh-v1.33.0) (2026-05-31)

View file

@ -5,28 +5,38 @@ Revises: 6bd9f67ec994
Create Date: 2026-06-02 07:58:00.002359
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = '384be6596b36'
down_revision: Union[str, None] = '6bd9f67ec994'
revision: str = "384be6596b36"
down_revision: Union[str, None] = "6bd9f67ec994"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_users_email'), table_name='users')
op.create_index('ix_users_email_lower', 'users', [sa.literal_column('lower(email)')], unique=True, postgresql_where=sa.text('email IS NOT NULL'))
op.drop_index(op.f("ix_users_email"), table_name="users")
op.create_index(
"ix_users_email_lower",
"users",
[sa.literal_column("lower(email)")],
unique=True,
postgresql_where=sa.text("email IS NOT NULL"),
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index('ix_users_email_lower', table_name='users', postgresql_where=sa.text('email IS NOT NULL'))
op.create_index(op.f('ix_users_email'), 'users', ['email'], unique=True)
op.drop_index(
"ix_users_email_lower",
table_name="users",
postgresql_where=sa.text("email IS NOT NULL"),
)
op.create_index(op.f("ix_users_email"), "users", ["email"], unique=True)
# ### end Alembic commands ###

View file

@ -117,6 +117,15 @@ app.add_middleware(
allow_headers=["*"],
)
def _add_public_embed_cors_middleware() -> None:
from api.routes.public_embed import PublicEmbedCORSMiddleware
app.add_middleware(PublicEmbedCORSMiddleware, api_prefix=API_PREFIX)
_add_public_embed_cors_middleware()
api_router = APIRouter()
# include subrouters here

View file

@ -1,5 +1,5 @@
[project]
name = "dograh-api"
version = "1.33.0"
version = "1.34.0"
description = "Backend API for Dograh voice AI platform"
requires-python = ">=3.13,<3.14"

View file

@ -7,6 +7,7 @@ They handle CORS, domain validation, and session management for embedded workflo
import secrets
from datetime import UTC, datetime, timedelta
from typing import Optional
from urllib.parse import urlsplit
from fastapi import (
APIRouter,
@ -16,6 +17,8 @@ from fastapi import (
)
from loguru import logger
from pydantic import BaseModel
from starlette.datastructures import Headers
from starlette.types import ASGIApp, Receive, Scope, Send
from api.db import db_client
from api.enums import WorkflowRunMode
@ -27,6 +30,9 @@ from api.routes.turn_credentials import (
router = APIRouter(prefix="/public/embed")
EMBED_CORS_ALLOW_HEADERS = "Content-Type, Origin"
EMBED_CORS_MAX_AGE = "86400"
class InitEmbedRequest(BaseModel):
"""Request model for initializing an embed session"""
@ -70,11 +76,9 @@ def validate_origin(origin: str, allowed_domains: list) -> bool:
# If no domains specified, allow all origins
return True
# Extract domain from origin (remove protocol)
if "://" in origin:
domain = origin.split("://")[1].split("/")[0].split(":")[0]
else:
domain = origin
domain, origin_port = _parse_origin_host_port(origin)
if not domain:
return False
# Normalize domain for www matching
def normalize_www(d: str) -> tuple[str, str]:
@ -87,16 +91,23 @@ def validate_origin(origin: str, allowed_domains: list) -> bool:
domain_variants = normalize_www(domain)
for allowed in allowed_domains:
allowed = str(allowed).strip().lower()
if allowed == "*":
return True
elif allowed.startswith("*."):
allowed_domain, allowed_port = _parse_origin_host_port(allowed)
if not allowed_domain:
continue
if allowed_port is not None and allowed_port != origin_port:
continue
if allowed_domain.startswith("*."):
# Wildcard subdomain matching
base_domain = allowed[2:]
base_domain = allowed_domain[2:]
if domain == base_domain or domain.endswith("." + base_domain):
return True
else:
# Check both www and non-www versions
allowed_variants = normalize_www(allowed)
allowed_variants = normalize_www(allowed_domain)
# If any variant of domain matches any variant of allowed, it's valid
if any(
dv in allowed_variants or av in domain_variants
@ -108,6 +119,24 @@ def validate_origin(origin: str, allowed_domains: list) -> bool:
return False
def _parse_origin_host_port(value: str) -> tuple[str, str | None]:
candidate = value.strip().lower()
if not candidate:
return "", None
if "://" not in candidate and not candidate.startswith("//"):
candidate = f"//{candidate}"
parsed = urlsplit(candidate)
try:
parsed_port = parsed.port
except ValueError:
parsed_port = None
port = str(parsed_port) if parsed_port is not None else None
return (parsed.hostname or "").rstrip("."), port
def generate_session_token() -> str:
"""Generate a cryptographically secure session token"""
return f"emb_session_{secrets.token_urlsafe(32)}"
@ -121,8 +150,120 @@ def get_request_origin(request: Request) -> str:
return origin
def _cors_response(origin: str, methods: str) -> Response:
return Response(
headers={
"Access-Control-Allow-Origin": origin,
"Access-Control-Allow-Methods": methods,
"Access-Control-Allow-Headers": EMBED_CORS_ALLOW_HEADERS,
"Access-Control-Max-Age": EMBED_CORS_MAX_AGE,
"Vary": "Origin",
}
)
def _allow_embed_origin(response: Response, origin: str) -> None:
response.headers["Access-Control-Allow-Origin"] = origin
vary = response.headers.get("Vary")
if not vary:
response.headers["Vary"] = "Origin"
return
vary_values = {value.strip().lower() for value in vary.split(",")}
if "origin" not in vary_values:
response.headers["Vary"] = f"{vary}, Origin"
async def _config_preflight_response(token: str, origin: str) -> Response:
embed_token = await db_client.get_embed_token_by_token(token)
if not embed_token or not embed_token.is_active:
return Response(status_code=403)
if not validate_origin(origin, embed_token.allowed_domains or []):
return Response(status_code=403)
return _cors_response(origin, "GET, OPTIONS")
async def _turn_credentials_preflight_response(
session_token: str, origin: str
) -> Response:
embed_session = await db_client.get_embed_session_by_token(session_token)
if not embed_session:
return Response(status_code=403)
if embed_session.expires_at and embed_session.expires_at < datetime.now(UTC):
return Response(status_code=403)
embed_token = await db_client.get_embed_token_by_id(embed_session.embed_token_id)
if not embed_token:
return Response(status_code=403)
if not validate_origin(origin, embed_token.allowed_domains or []):
return Response(status_code=403)
return _cors_response(origin, "GET, OPTIONS")
async def build_public_embed_preflight_response(
path: str, origin: str, requested_method: str, api_prefix: str = "/api/v1"
) -> Response | None:
"""Handle embed preflights before global CORSMiddleware rejects external sites."""
public_embed_prefix = f"{api_prefix.rstrip('/')}/public/embed"
if path == f"{public_embed_prefix}/init":
if requested_method.upper() != "POST":
return Response(status_code=405)
return _cors_response(origin, "POST, OPTIONS")
config_prefix = f"{public_embed_prefix}/config/"
if path.startswith(config_prefix):
if requested_method.upper() != "GET":
return Response(status_code=405)
token = path[len(config_prefix) :].split("/", 1)[0]
return await _config_preflight_response(token, origin)
turn_credentials_prefix = f"{public_embed_prefix}/turn-credentials/"
if path.startswith(turn_credentials_prefix):
if requested_method.upper() != "GET":
return Response(status_code=405)
session_token = path[len(turn_credentials_prefix) :].split("/", 1)[0]
return await _turn_credentials_preflight_response(session_token, origin)
return None
class PublicEmbedCORSMiddleware:
"""Allow token-gated embed CORS before global SaaS CORS rejects preflights."""
def __init__(self, app: ASGIApp, api_prefix: str = "/api/v1"):
self.app = app
self.api_prefix = api_prefix
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] != "http" or scope.get("method") != "OPTIONS":
await self.app(scope, receive, send)
return
headers = Headers(scope=scope)
origin = headers.get("origin")
requested_method = headers.get("access-control-request-method")
if origin and requested_method:
response = await build_public_embed_preflight_response(
scope.get("path", ""), origin, requested_method, self.api_prefix
)
if response is not None:
await response(scope, receive, send)
return
await self.app(scope, receive, send)
@router.post("/init", response_model=InitEmbedResponse)
async def initialize_embed_session(request: Request, init_request: InitEmbedRequest):
async def initialize_embed_session(
request: Request, init_request: InitEmbedRequest, response: Response
):
"""Initialize an embed session with token validation and domain checking.
This endpoint:
@ -158,6 +299,9 @@ async def initialize_embed_session(request: Request, init_request: InitEmbedRequ
)
raise HTTPException(status_code=403, detail=f"Domain not allowed: {origin}")
if origin:
_allow_embed_origin(response, origin)
# Create workflow run
try:
workflow_run = await db_client.create_workflow_run(
@ -204,8 +348,19 @@ async def initialize_embed_session(request: Request, init_request: InitEmbedRequ
)
@router.options("/config/{token}")
async def options_embed_config(token: str, request: Request):
"""Fallback OPTIONS handler for the embed config endpoint.
Browser preflights include Access-Control-Request-Method and are handled by
PublicEmbedCORSMiddleware before global CORS. This keeps non-conformant
OPTIONS requests on the same validation path.
"""
return await _config_preflight_response(token, request.headers.get("origin", ""))
@router.get("/config/{token}", response_model=EmbedConfigResponse)
async def get_embed_config(token: str, request: Request):
async def get_embed_config(token: str, request: Request, response: Response):
"""Get embed configuration without creating a session.
This endpoint is used to fetch widget configuration for display purposes
@ -226,6 +381,11 @@ async def get_embed_config(token: str, request: Request):
if not validate_origin(origin, embed_token.allowed_domains or []):
raise HTTPException(status_code=403, detail=f"Domain not allowed: {origin}")
# Set CORS header explicitly; the global CORSMiddleware covers only
# first-party origins; this endpoint is fetched by external embed sites.
if origin:
_allow_embed_origin(response, origin)
# Extract settings with defaults
settings = embed_token.settings or {}
@ -243,24 +403,20 @@ async def get_embed_config(token: str, request: Request):
@router.options("/init")
async def options_init(request: Request):
"""Handle CORS preflight for init endpoint"""
"""Fallback OPTIONS handler for init endpoint."""
# Browser preflights are handled by PublicEmbedCORSMiddleware before global CORS.
# For init endpoint, we need to check the token in the request body
# But OPTIONS requests don't have body, so we'll be permissive
# The actual validation happens in the POST request
origin = request.headers.get("origin", "*")
return Response(
headers={
"Access-Control-Allow-Origin": origin,
"Access-Control-Allow-Methods": "POST, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type, Origin",
"Access-Control-Max-Age": "86400",
}
)
return _cors_response(origin, "POST, OPTIONS")
@router.get("/turn-credentials/{session_token}", response_model=TurnCredentialsResponse)
async def get_public_turn_credentials(session_token: str, request: Request):
async def get_public_turn_credentials(
session_token: str, request: Request, response: Response
):
"""Get TURN credentials for an embed session.
This endpoint allows embedded widgets to obtain TURN server credentials
@ -295,6 +451,9 @@ async def get_public_turn_credentials(session_token: str, request: Request):
)
raise HTTPException(status_code=403, detail=f"Domain not allowed: {origin}")
if origin:
_allow_embed_origin(response, origin)
# Check if TURN is configured
if not TURN_SECRET:
raise HTTPException(
@ -316,63 +475,8 @@ async def get_public_turn_credentials(session_token: str, request: Request):
@router.options("/turn-credentials/{session_token}")
async def options_turn_credentials(request: Request, session_token: str):
"""Handle CORS preflight for TURN credentials endpoint"""
origin = request.headers.get("origin", "*")
# Try to validate the session token and get allowed domains
allowed_origin = origin
try:
embed_session = await db_client.get_embed_session_by_token(session_token)
if embed_session:
embed_token = await db_client.get_embed_token_by_id(
embed_session.embed_token_id
)
if embed_token:
# Check if origin is in allowed domains (empty means allow all)
if validate_origin(origin, embed_token.allowed_domains or []):
allowed_origin = origin
else:
allowed_origin = ""
except Exception:
# On error, be permissive for OPTIONS
pass
return Response(
headers={
"Access-Control-Allow-Origin": allowed_origin,
"Access-Control-Allow-Methods": "GET, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type",
"Access-Control-Max-Age": "86400",
}
)
@router.options("/config/{token}")
async def options_config(request: Request, token: str):
"""Handle CORS preflight for config endpoint"""
# Get origin header
origin = request.headers.get("origin", "*")
# Try to validate the token and get allowed domains
allowed_origin = origin
try:
embed_token = await db_client.get_embed_token_by_token(token)
if embed_token and embed_token.is_active:
# Check if origin is in allowed domains
if validate_origin(origin, embed_token.allowed_domains or []):
allowed_origin = origin
else:
# If not allowed, don't include the origin
allowed_origin = ""
except Exception:
# On error, be permissive for OPTIONS
pass
return Response(
headers={
"Access-Control-Allow-Origin": allowed_origin,
"Access-Control-Allow-Methods": "GET, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type",
"Access-Control-Max-Age": "86400",
}
"""Fallback OPTIONS handler for TURN credentials endpoint."""
# Browser preflights are handled by PublicEmbedCORSMiddleware before global CORS.
return await _turn_credentials_preflight_response(
session_token, request.headers.get("origin", "")
)

View file

@ -15,6 +15,29 @@ from api.utils.credential_auth import build_auth_header
PRE_CALL_FETCH_TIMEOUT_SECONDS = 10
def _extract_initial_context(response_data: Dict[str, Any]) -> Dict[str, Any]:
"""Pull the context variables out of a pre-call fetch response.
The canonical key is ``initial_context``. The legacy ``dynamic_variables``
key is still accepted for backward compatibility, so existing endpoints
keep working; ``initial_context`` takes precedence when both are present.
Either key may appear at the top level or nested under ``call_inbound``:
{"call_inbound": {"initial_context": {...}}} | {"initial_context": {...}}
{"call_inbound": {"dynamic_variables": {...}}} | {"dynamic_variables": {...}}
"""
container = response_data.get("call_inbound")
if not isinstance(container, dict):
container = response_data
for key in ("initial_context", "dynamic_variables"):
value = container.get(key)
if isinstance(value, dict):
return value
return {}
async def execute_pre_call_fetch(
*,
url: str,
@ -77,24 +100,16 @@ async def execute_pre_call_fetch(
)
return {}
# Extract dynamic_variables from Retell-compatible response
# Supports: {call_inbound: {dynamic_variables: {...}}}
# or: {dynamic_variables: {...}}
dynamic_vars = {}
call_inbound = response_data.get("call_inbound")
if isinstance(call_inbound, dict):
dynamic_vars = call_inbound.get("dynamic_variables", {})
elif "dynamic_variables" in response_data:
dynamic_vars = response_data["dynamic_variables"]
if not isinstance(dynamic_vars, dict):
dynamic_vars = {}
# Extract the variables to merge into initial_context. Prefers
# the canonical `initial_context` key, falling back to the
# legacy `dynamic_variables` key for backward compatibility.
initial_context_vars = _extract_initial_context(response_data)
logger.info(
f"Pre-call fetch: success ({response.status_code}), "
f"dynamic_variables keys: {list(dynamic_vars.keys())}"
f"initial_context keys: {list(initial_context_vars.keys())}"
)
return dynamic_vars
return initial_context_vars
else:
logger.warning(
f"Pre-call fetch: HTTP {response.status_code} - "

View file

@ -6,9 +6,8 @@ provider registry — see ProviderSpec.router.
import json
from datetime import UTC, datetime
from typing import Optional
from fastapi import APIRouter, Header, Request
from fastapi import APIRouter, HTTPException, Request
from loguru import logger
from pipecat.utils.run_context import set_current_run_id
from starlette.responses import HTMLResponse
@ -29,6 +28,30 @@ from api.utils.telephony_helper import (
router = APIRouter()
async def _verify_vobiz_callback(
provider,
webhook_url: str,
callback_data: dict,
headers: dict,
raw_body: str,
*,
log_prefix: str,
) -> None:
"""Verify a Vobiz callback signature, failing closed.
Vobiz signs every callback, so a missing signature header is an invalid
request ``provider.verify_inbound_signature`` returns ``False`` for both
missing and forged signatures. Reject with HTTP 403 (per Vobiz's
callback-validation docs) so the caller never reaches status processing.
"""
is_valid = await provider.verify_inbound_signature(
webhook_url, callback_data, headers, raw_body
)
if not is_valid:
logger.warning(f"{log_prefix} Invalid or missing Vobiz callback signature")
raise HTTPException(status_code=403, detail="Invalid webhook signature")
@router.post("/vobiz-xml", include_in_schema=False)
async def handle_vobiz_xml_webhook(
workflow_id: int, user_id: int, workflow_run_id: int, organization_id: int
@ -65,8 +88,6 @@ async def handle_vobiz_xml_webhook(
async def handle_vobiz_hangup_callback(
workflow_run_id: int,
request: Request,
x_vobiz_signature: Optional[str] = Header(None),
x_vobiz_timestamp: Optional[str] = Header(None),
):
"""Handle Vobiz hangup callback (sent when call ends).
@ -75,82 +96,23 @@ async def handle_vobiz_hangup_callback(
"""
set_current_run_id(workflow_run_id)
# Logging all headers and body to understand what Vobiz actually sends
all_headers = dict(request.headers)
logger.info(
f"[run {workflow_run_id}] Vobiz hangup callback - Headers: {json.dumps(all_headers)}"
)
# Parse the callback data from the raw body so signed webhooks can verify
# the exact bytes Vobiz sent without draining the request stream first.
callback_data, raw_body = await parse_webhook_request(request)
# TODO: Remove this debug logging after Vobiz team clarifies webhook authentication
logger.info(
f"[run {workflow_run_id}] Vobiz hangup callback - Body: {json.dumps(callback_data)}"
)
logger.info(
f"[run {workflow_run_id}] Received Vobiz hangup callback {json.dumps(callback_data)}"
)
# Verify signature if Vobiz provided any supported signature header.
has_vobiz_signature = any(
header in all_headers
for header in (
"x-vobiz-signature-v3",
"x-vobiz-signature-ma-v3",
"x-vobiz-signature-v2",
"x-vobiz-signature-ma-v2",
)
)
if has_vobiz_signature:
# We need the workflow run to get organization for provider credentials
workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
if not workflow_run:
logger.warning(
f"[run {workflow_run_id}] Workflow run not found for signature verification"
)
return {"status": "error", "reason": "workflow_run_not_found"}
workflow = await db_client.get_workflow_by_id(workflow_run.workflow_id)
if not workflow:
logger.warning(
f"[run {workflow_run_id}] Workflow not found for signature verification"
)
return {"status": "error", "reason": "workflow_not_found"}
provider = await get_telephony_provider_for_run(
workflow_run, workflow.organization_id
)
# Verify signature
backend_endpoint, _ = await get_backend_endpoints()
webhook_url = f"{backend_endpoint}/api/v1/telephony/vobiz/hangup-callback/{workflow_run_id}"
is_valid = await provider.verify_inbound_signature(
webhook_url,
callback_data,
all_headers,
raw_body,
)
if not is_valid:
logger.warning(
f"[run {workflow_run_id}] Invalid Vobiz hangup callback signature"
)
return {"status": "error", "reason": "invalid_signature"}
logger.info(f"[run {workflow_run_id}] Vobiz hangup callback signature verified")
else:
# Get workflow run for processing (signature verification already got it if needed)
workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
if not workflow_run:
logger.warning(
f"[run {workflow_run_id}] Workflow run not found for Vobiz hangup callback"
)
return {"status": "ignored", "reason": "workflow_run_not_found"}
# Get workflow and provider
workflow = await db_client.get_workflow_by_id(workflow_run.workflow_id)
if not workflow:
logger.warning(f"[run {workflow_run_id}] Workflow not found")
@ -160,6 +122,21 @@ async def handle_vobiz_hangup_callback(
workflow_run, workflow.organization_id
)
# Fail closed: Vobiz signs every callback, so reject unsigned/forged ones
# before they can mutate call state.
backend_endpoint, _ = await get_backend_endpoints()
webhook_url = (
f"{backend_endpoint}/api/v1/telephony/vobiz/hangup-callback/{workflow_run_id}"
)
await _verify_vobiz_callback(
provider,
webhook_url,
callback_data,
all_headers,
raw_body,
log_prefix=f"[run {workflow_run_id}]",
)
logger.debug(
f"[run {workflow_run_id}] Processing Vobiz hangup with provider: {provider.PROVIDER_NAME}"
)
@ -167,10 +144,6 @@ async def handle_vobiz_hangup_callback(
# Parse the callback data into generic format
parsed_data = provider.parse_status_callback(callback_data)
logger.debug(
f"[run {workflow_run_id}] Parsed Vobiz callback data: {json.dumps(parsed_data)}"
)
# Create StatusCallbackRequest from parsed data
status_update = StatusCallbackRequest(
call_id=parsed_data["call_id"],
@ -194,8 +167,6 @@ async def handle_vobiz_hangup_callback(
async def handle_vobiz_ring_callback(
workflow_run_id: int,
request: Request,
x_vobiz_signature: Optional[str] = Header(None),
x_vobiz_timestamp: Optional[str] = Header(None),
):
"""Handle Vobiz ring callback (sent when call starts ringing).
@ -204,84 +175,46 @@ async def handle_vobiz_ring_callback(
"""
set_current_run_id(workflow_run_id)
# Logging all headers and body to understand what Vobiz actually sends
all_headers = dict(request.headers)
logger.info(
f"[run {workflow_run_id}] Vobiz ring callback - Headers: {json.dumps(all_headers)}"
)
# Parse the callback data from the raw body so signed webhooks can verify
# the exact bytes Vobiz sent without draining the request stream first.
callback_data, raw_body = await parse_webhook_request(request)
# TODO: Remove this debug logging after Vobiz team clarifies webhook authentication
logger.info(
f"[run {workflow_run_id}] Vobiz ring callback - Body: {json.dumps(callback_data)}"
)
logger.info(
f"[run {workflow_run_id}] Received Vobiz ring callback {json.dumps(callback_data)}"
)
# Verify signature if Vobiz provided any supported signature header.
has_vobiz_signature = any(
header in all_headers
for header in (
"x-vobiz-signature-v3",
"x-vobiz-signature-ma-v3",
"x-vobiz-signature-v2",
"x-vobiz-signature-ma-v2",
)
)
if has_vobiz_signature:
# We need the workflow run to get organization for provider credentials
workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
if not workflow_run:
logger.warning(
f"[run {workflow_run_id}] Workflow run not found for signature verification"
)
return {"status": "error", "reason": "workflow_run_not_found"}
workflow = await db_client.get_workflow_by_id(workflow_run.workflow_id)
if not workflow:
logger.warning(
f"[run {workflow_run_id}] Workflow not found for signature verification"
)
return {"status": "error", "reason": "workflow_not_found"}
provider = await get_telephony_provider_for_run(
workflow_run, workflow.organization_id
)
# Verify signature
backend_endpoint, _ = await get_backend_endpoints()
webhook_url = (
f"{backend_endpoint}/api/v1/telephony/vobiz/ring-callback/{workflow_run_id}"
)
is_valid = await provider.verify_inbound_signature(
webhook_url,
callback_data,
all_headers,
raw_body,
)
if not is_valid:
logger.warning(
f"[run {workflow_run_id}] Invalid Vobiz ring callback signature"
)
return {"status": "error", "reason": "invalid_signature"}
logger.info(f"[run {workflow_run_id}] Vobiz ring callback signature verified")
else:
# Get workflow run for processing (signature verification already got it if needed)
workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
if not workflow_run:
logger.warning(
f"[run {workflow_run_id}] Workflow run not found for Vobiz ring callback"
)
return {"status": "ignored", "reason": "workflow_run_not_found"}
workflow = await db_client.get_workflow_by_id(workflow_run.workflow_id)
if not workflow:
logger.warning(f"[run {workflow_run_id}] Workflow not found")
return {"status": "ignored", "reason": "workflow_not_found"}
provider = await get_telephony_provider_for_run(
workflow_run, workflow.organization_id
)
# Fail closed: reject unsigned/forged ring callbacks before logging them.
backend_endpoint, _ = await get_backend_endpoints()
webhook_url = (
f"{backend_endpoint}/api/v1/telephony/vobiz/ring-callback/{workflow_run_id}"
)
await _verify_vobiz_callback(
provider,
webhook_url,
callback_data,
all_headers,
raw_body,
log_prefix=f"[run {workflow_run_id}]",
)
# Log the ringing event
telephony_callback_logs = workflow_run.logs.get("telephony_status_callbacks", [])
ring_log = {
@ -308,15 +241,10 @@ async def handle_vobiz_ring_callback(
async def handle_vobiz_hangup_callback_by_workflow(
workflow_id: int,
request: Request,
x_vobiz_signature: Optional[str] = Header(None),
x_vobiz_timestamp: Optional[str] = Header(None),
):
"""Handle Vobiz hangup callback with workflow_id - finds workflow run by call_id."""
all_headers = dict(request.headers)
logger.info(
f"[workflow {workflow_id}] Vobiz hangup callback - Headers: {json.dumps(all_headers)}"
)
try:
callback_data, raw_body = await parse_webhook_request(request)
@ -364,35 +292,18 @@ async def handle_vobiz_hangup_callback_by_workflow(
workflow_run, workflow.organization_id
)
has_vobiz_signature = any(
header in all_headers
for header in (
"x-vobiz-signature-v3",
"x-vobiz-signature-ma-v3",
"x-vobiz-signature-v2",
"x-vobiz-signature-ma-v2",
)
# Fail closed: Vobiz signs every callback, so reject unsigned/forged ones
# before they can mutate call state.
backend_endpoint, _ = await get_backend_endpoints()
webhook_url = f"{backend_endpoint}/api/v1/telephony/vobiz/hangup-callback/workflow/{workflow_id}"
await _verify_vobiz_callback(
provider,
webhook_url,
callback_data,
all_headers,
raw_body,
log_prefix=f"[workflow {workflow_id}]",
)
if has_vobiz_signature:
backend_endpoint, _ = await get_backend_endpoints()
webhook_url = f"{backend_endpoint}/api/v1/telephony/vobiz/hangup-callback/workflow/{workflow_id}"
is_valid = await provider.verify_inbound_signature(
webhook_url,
callback_data,
all_headers,
raw_body,
)
if not is_valid:
logger.warning(
f"[workflow {workflow_id}] Invalid Vobiz hangup callback signature"
)
return {"status": "error", "message": "invalid_signature"}
logger.info(
f"[workflow {workflow_id}] Vobiz hangup callback signature verified"
)
try:
parsed_data = provider.parse_status_callback(callback_data)

View file

@ -6,11 +6,13 @@ from unittest.mock import AsyncMock, patch
from urllib.parse import urlencode
import pytest
from fastapi import HTTPException
from starlette.requests import Request
from api.services.telephony.providers.vobiz.provider import VobizProvider
from api.services.telephony.providers.vobiz.routes import (
handle_vobiz_hangup_callback,
handle_vobiz_hangup_callback_by_workflow,
handle_vobiz_ring_callback,
)
@ -225,3 +227,154 @@ async def test_vobiz_verify_inbound_signature_rejects_missing_signature():
{},
{},
)
@pytest.mark.asyncio
async def test_vobiz_hangup_callback_rejects_missing_signature():
"""An unsigned hangup callback must be rejected before status processing."""
provider = _provider()
form_data = {
"CallUUID": "call-123",
"CallStatus": "completed",
"From": "15551230001",
"To": "15551230002",
"Direction": "outbound",
"Duration": "12",
}
# No x-vobiz-signature-* headers — the callback is unsigned.
request = _request(
path="/api/v1/telephony/vobiz/hangup-callback/123",
form_data=form_data,
)
with (
patch("api.services.telephony.providers.vobiz.routes.db_client") as db_client,
patch(
"api.services.telephony.providers.vobiz.routes.get_telephony_provider_for_run",
new_callable=AsyncMock,
return_value=provider,
),
patch(
"api.services.telephony.providers.vobiz.routes.get_backend_endpoints",
new_callable=AsyncMock,
return_value=("https://example.test", "wss://example.test"),
),
patch(
"api.services.telephony.providers.vobiz.routes._process_status_update",
new_callable=AsyncMock,
) as process_status,
):
db_client.get_workflow_run_by_id = AsyncMock(
return_value=SimpleNamespace(workflow_id=7)
)
db_client.get_workflow_by_id = AsyncMock(
return_value=SimpleNamespace(organization_id=11)
)
with pytest.raises(HTTPException) as exc_info:
await handle_vobiz_hangup_callback(
workflow_run_id=123,
request=request,
)
assert exc_info.value.status_code == 403
process_status.assert_not_awaited()
@pytest.mark.asyncio
async def test_vobiz_ring_callback_rejects_missing_signature():
"""An unsigned ring callback must be rejected before it is logged."""
provider = _provider()
form_data = {
"CallUUID": "call-123",
"CallStatus": "ringing",
"From": "15551230001",
"To": "15551230002",
}
# No x-vobiz-signature-* headers — the callback is unsigned.
request = _request(
path="/api/v1/telephony/vobiz/ring-callback/123",
form_data=form_data,
)
workflow_run = SimpleNamespace(workflow_id=7, logs={})
with (
patch("api.services.telephony.providers.vobiz.routes.db_client") as db_client,
patch(
"api.services.telephony.providers.vobiz.routes.get_telephony_provider_for_run",
new_callable=AsyncMock,
return_value=provider,
),
patch(
"api.services.telephony.providers.vobiz.routes.get_backend_endpoints",
new_callable=AsyncMock,
return_value=("https://example.test", "wss://example.test"),
),
):
db_client.get_workflow_run_by_id = AsyncMock(return_value=workflow_run)
db_client.get_workflow_by_id = AsyncMock(
return_value=SimpleNamespace(organization_id=11)
)
db_client.update_workflow_run = AsyncMock()
with pytest.raises(HTTPException) as exc_info:
await handle_vobiz_ring_callback(
workflow_run_id=123,
request=request,
)
assert exc_info.value.status_code == 403
db_client.update_workflow_run.assert_not_awaited()
@pytest.mark.asyncio
async def test_vobiz_hangup_callback_by_workflow_rejects_missing_signature():
"""An unsigned by-workflow hangup callback must be rejected before processing."""
provider = _provider()
form_data = {
"CallUUID": "call-123",
"CallStatus": "completed",
"From": "15551230001",
"To": "15551230002",
"Direction": "outbound",
"Duration": "12",
}
# No x-vobiz-signature-* headers — the callback is unsigned.
request = _request(
path="/api/v1/telephony/vobiz/hangup-callback/workflow/7",
form_data=form_data,
)
with (
patch("api.services.telephony.providers.vobiz.routes.db_client") as db_client,
patch(
"api.services.telephony.providers.vobiz.routes.get_telephony_provider_for_run",
new_callable=AsyncMock,
return_value=provider,
),
patch(
"api.services.telephony.providers.vobiz.routes.get_backend_endpoints",
new_callable=AsyncMock,
return_value=("https://example.test", "wss://example.test"),
),
patch(
"api.services.telephony.providers.vobiz.routes._process_status_update",
new_callable=AsyncMock,
) as process_status,
):
db_client.get_workflow_by_id = AsyncMock(
return_value=SimpleNamespace(organization_id=11)
)
db_client.get_workflow_run_by_call_id = AsyncMock(
return_value=SimpleNamespace(id=123, workflow_id=7)
)
with pytest.raises(HTTPException) as exc_info:
await handle_vobiz_hangup_callback_by_workflow(
workflow_id=7,
request=request,
)
assert exc_info.value.status_code == 403
process_status.assert_not_awaited()

View file

@ -0,0 +1,66 @@
from api.services.pipecat.pre_call_fetch import _extract_initial_context
class TestExtractInitialContext:
"""Tests for _extract_initial_context, the pre-call fetch response parser."""
def test_initial_context_nested_under_call_inbound(self):
"""The canonical `initial_context` key nested under `call_inbound`."""
response = {"call_inbound": {"initial_context": {"customer_name": "Jane"}}}
assert _extract_initial_context(response) == {"customer_name": "Jane"}
def test_initial_context_at_top_level(self):
"""The canonical `initial_context` key at the top level."""
response = {"initial_context": {"customer_name": "Jane"}}
assert _extract_initial_context(response) == {"customer_name": "Jane"}
def test_legacy_dynamic_variables_nested(self):
"""The legacy `dynamic_variables` key still works nested under `call_inbound`."""
response = {"call_inbound": {"dynamic_variables": {"customer_name": "Jane"}}}
assert _extract_initial_context(response) == {"customer_name": "Jane"}
def test_legacy_dynamic_variables_at_top_level(self):
"""The legacy `dynamic_variables` key still works at the top level."""
response = {"dynamic_variables": {"customer_name": "Jane"}}
assert _extract_initial_context(response) == {"customer_name": "Jane"}
def test_initial_context_takes_precedence_over_legacy(self):
"""When both keys are present, `initial_context` wins."""
response = {
"call_inbound": {
"initial_context": {"source": "new"},
"dynamic_variables": {"source": "legacy"},
}
}
assert _extract_initial_context(response) == {"source": "new"}
def test_falls_back_to_legacy_when_initial_context_not_a_dict(self):
"""A non-dict `initial_context` falls back to `dynamic_variables`."""
response = {
"initial_context": None,
"dynamic_variables": {"customer_name": "Jane"},
}
assert _extract_initial_context(response) == {"customer_name": "Jane"}
def test_nested_values_preserved(self):
"""Nested objects pass through untouched for dot-notation access."""
response = {
"call_inbound": {
"initial_context": {"customer": {"address": {"city": "LA"}}}
}
}
assert _extract_initial_context(response) == {
"customer": {"address": {"city": "LA"}}
}
def test_empty_when_no_known_keys(self):
"""A response with neither key yields an empty dict."""
assert _extract_initial_context({"call_inbound": {"agent_id": 1}}) == {}
def test_empty_when_call_inbound_missing(self):
"""No `call_inbound` and no top-level keys yields an empty dict."""
assert _extract_initial_context({}) == {}
def test_non_dict_vars_yield_empty(self):
"""A non-dict value under a known key yields an empty dict."""
assert _extract_initial_context({"initial_context": "nope"}) == {}

View file

@ -0,0 +1,274 @@
from types import SimpleNamespace
import pytest
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.testclient import TestClient
from api.routes.public_embed import PublicEmbedCORSMiddleware, router
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["https://app.dograh.com"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.add_middleware(PublicEmbedCORSMiddleware, api_prefix="/api/v1")
app.include_router(router, prefix="/api/v1")
client = TestClient(app, raise_server_exceptions=False)
_ACTIVE_TOKEN = SimpleNamespace(
id=10,
is_active=True,
expires_at=None,
allowed_domains=[],
workflow_id=1,
created_by=7,
usage_limit=None,
usage_count=0,
settings={},
)
_RESTRICTED_TOKEN = SimpleNamespace(
id=20,
is_active=True,
expires_at=None,
allowed_domains=["allowed.example.com"],
workflow_id=2,
created_by=7,
usage_limit=None,
usage_count=0,
settings={},
)
_LOCALHOST_TOKEN = SimpleNamespace(
id=30,
is_active=True,
expires_at=None,
allowed_domains=["localhost:3000", "localhost:3020"],
workflow_id=3,
created_by=7,
usage_limit=None,
usage_count=0,
settings={},
)
@pytest.fixture(autouse=True)
def _patch_db(monkeypatch):
async def _get_token(token):
if token == "valid":
return _ACTIVE_TOKEN
if token == "restricted":
return _RESTRICTED_TOKEN
if token == "localhost":
return _LOCALHOST_TOKEN
return None
async def _get_token_by_id(token_id):
if token_id == _ACTIVE_TOKEN.id:
return _ACTIVE_TOKEN
if token_id == _RESTRICTED_TOKEN.id:
return _RESTRICTED_TOKEN
if token_id == _LOCALHOST_TOKEN.id:
return _LOCALHOST_TOKEN
return None
async def _get_session(session_token):
if session_token == "session-valid":
return SimpleNamespace(embed_token_id=_ACTIVE_TOKEN.id, expires_at=None)
if session_token == "session-restricted":
return SimpleNamespace(embed_token_id=_RESTRICTED_TOKEN.id, expires_at=None)
return None
async def _create_workflow_run(**_kwargs):
return SimpleNamespace(id=123)
async def _noop(*_args, **_kwargs):
return None
monkeypatch.setattr(
"api.routes.public_embed.db_client.get_embed_token_by_token",
_get_token,
)
monkeypatch.setattr(
"api.routes.public_embed.db_client.get_embed_token_by_id",
_get_token_by_id,
)
monkeypatch.setattr(
"api.routes.public_embed.db_client.get_embed_session_by_token",
_get_session,
)
monkeypatch.setattr(
"api.routes.public_embed.db_client.create_workflow_run",
_create_workflow_run,
)
monkeypatch.setattr(
"api.routes.public_embed.db_client.create_embed_session",
_noop,
)
monkeypatch.setattr(
"api.routes.public_embed.db_client.increment_embed_token_usage",
_noop,
)
monkeypatch.setattr("api.routes.public_embed.TURN_SECRET", "test-secret")
monkeypatch.setattr(
"api.routes.public_embed.generate_turn_credentials",
lambda _user_id: {
"username": "turn-user",
"password": "turn-password",
"ttl": 3600,
"uris": ["turn:example.com:3478"],
},
)
def _assert_embed_cors(resp, origin: str):
assert resp.headers.get("access-control-allow-origin") == origin
assert "origin" in {
value.strip().lower() for value in resp.headers.get("vary", "").split(",")
}
def test_options_config_returns_acao_for_allowed_origin():
origin = "https://mysite.vercel.app"
resp = client.options(
"/api/v1/public/embed/config/valid",
headers={
"Origin": origin,
"Access-Control-Request-Method": "GET",
},
)
assert resp.status_code == 200
_assert_embed_cors(resp, origin)
def test_options_config_accepts_allowed_localhost_port():
origin = "http://localhost:3020"
resp = client.options(
"/api/v1/public/embed/config/localhost",
headers={
"Origin": origin,
"Access-Control-Request-Method": "GET",
},
)
assert resp.status_code == 200
_assert_embed_cors(resp, origin)
def test_options_config_rejects_unknown_token():
resp = client.options(
"/api/v1/public/embed/config/unknown",
headers={
"Origin": "https://mysite.vercel.app",
"Access-Control-Request-Method": "GET",
},
)
assert resp.status_code == 403
def test_options_config_rejects_disallowed_origin():
resp = client.options(
"/api/v1/public/embed/config/restricted",
headers={
"Origin": "https://notallowed.example.com",
"Access-Control-Request-Method": "GET",
},
)
assert resp.status_code == 403
def test_get_config_includes_acao_header():
origin = "https://mysite.vercel.app"
resp = client.get(
"/api/v1/public/embed/config/valid",
headers={"Origin": origin},
)
assert resp.status_code == 200
_assert_embed_cors(resp, origin)
def test_get_config_accepts_allowed_localhost_port():
origin = "http://localhost:3020"
resp = client.get(
"/api/v1/public/embed/config/localhost",
headers={"Origin": origin},
)
assert resp.status_code == 200
_assert_embed_cors(resp, origin)
def test_get_config_rejects_unlisted_localhost_port():
resp = client.get(
"/api/v1/public/embed/config/localhost",
headers={"Origin": "http://localhost:3021"},
)
assert resp.status_code == 403
def test_get_config_rejects_disallowed_origin():
resp = client.get(
"/api/v1/public/embed/config/restricted",
headers={"Origin": "https://notallowed.example.com"},
)
assert resp.status_code == 403
def test_init_includes_acao_header():
origin = "https://mysite.vercel.app"
resp = client.post(
"/api/v1/public/embed/init",
headers={"Origin": origin},
json={"token": "valid"},
)
assert resp.status_code == 200
_assert_embed_cors(resp, origin)
def test_turn_credentials_includes_acao_header():
origin = "https://mysite.vercel.app"
resp = client.get(
"/api/v1/public/embed/turn-credentials/session-valid",
headers={"Origin": origin},
)
assert resp.status_code == 200
_assert_embed_cors(resp, origin)
def test_options_init_returns_acao_for_allowed_origin():
origin = "https://mysite.vercel.app"
resp = client.options(
"/api/v1/public/embed/init",
headers={
"Origin": origin,
"Access-Control-Request-Method": "POST",
},
)
assert resp.status_code == 200
_assert_embed_cors(resp, origin)
def test_options_turn_credentials_returns_acao_for_allowed_origin():
origin = "https://mysite.vercel.app"
resp = client.options(
"/api/v1/public/embed/turn-credentials/session-valid",
headers={
"Origin": origin,
"Access-Control-Request-Method": "GET",
},
)
assert resp.status_code == 200
_assert_embed_cors(resp, origin)
def test_options_turn_credentials_rejects_disallowed_origin():
resp = client.options(
"/api/v1/public/embed/turn-credentials/session-restricted",
headers={
"Origin": "https://notallowed.example.com",
"Access-Control-Request-Method": "GET",
},
)
assert resp.status_code == 403

View file

@ -3,7 +3,11 @@ services:
image: pgvector/pgvector:pg17
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
# Sourced from .env. Defaults to "postgres"
# NOTE: changing this on an existing install does NOT
# re-key the database — the password is baked into the volume on first init.
# You can manually change the password using psql in the container
POSTGRES_PASSWORD: "${POSTGRES_PASSWORD:-postgres}"
POSTGRES_DB: postgres
logging:
driver: "json-file"
@ -136,7 +140,7 @@ services:
BACKEND_API_ENDPOINT: "${BACKEND_API_ENDPOINT:-http://localhost:8000}"
# Database configuration (using containerized postgres)
DATABASE_URL: "postgresql+asyncpg://postgres:postgres@postgres:5432/postgres"
DATABASE_URL: "postgresql+asyncpg://postgres:${POSTGRES_PASSWORD:-postgres}@postgres:5432/postgres"
# Redis configuration (using containerized redis)
REDIS_URL: "redis://:redissecret@redis:6379"

View file

@ -18,20 +18,10 @@ initial_context ──► Agent ──► gathered_context
Data available to the agent before the call starts — the contact's name, account details, appointment information, anything the agent should know upfront. It can be set from several places:
- **API trigger** — pass it in the request body when calling `POST /public/agent/{uuid}` or `POST /telephony/initiate-call`
- **Campaign CSV** — columns beyond `phone_number` automatically become `initial_context` fields for each contact's call
- **Dashboard** — set default template context variables on the agent, used when no external context is provided
```json
{
"phone_number": "+14155550100",
"initial_context": {
"customer_name": "Jane Smith",
"plan": "premium",
"renewal_date": "April 1"
}
}
```
- **[API trigger](/voice-agent/api-trigger)** — pass it in the request body when calling `POST /public/agent/{uuid}` or `POST /telephony/initiate-call`
- **[Campaign CSV](/core-concepts/campaigns)** — columns beyond `phone_number` automatically become `initial_context` fields for each contact's call
- **[Pre-call data fetch](/voice-agent/pre-call-data-fetch)** — enrich the context with data from your CRM or ERP via an HTTP call as the call starts, before the agent speaks
- **[Agent Settings](/voice-agent/template-variables#using-template-variables-for-testing)** — set template context variables on the agent for testing; they're included in test calls from the workflow editor and ignored on production calls
### Template variables
@ -103,7 +93,7 @@ Data the agent collects *during* the call. You configure what to extract in the
<img src="../images/extracted_variables.png" alt="Extracted variables" style={{border: "1px solid #d1d5db", borderRadius: "8px", maxWidth: "100%"}} />
`gathered_context` is returned in the run record after the call completes and is available in [webhook payloads](/developer/webhooks) for downstream processing.
`gathered_context` is returned in the run record after the call completes and is available in [webhook payloads](/developer/webhooks) for downstream processing. It is **not** available as a template variable in Agent prompts — prompts can only reference `initial_context` fields.
## Data flow example

Binary file not shown.

After

Width:  |  Height:  |  Size: 182 KiB

View file

@ -118,7 +118,7 @@ For example, if your request includes:
}
```
You can reference the user's name in your prompt as `{{initial_context.user.name}}`.
You can reference the user's name in your agent prompt as `{{user.name}}` — in Agent prompts, `initial_context` fields are referenced directly by name (not prefixed with `initial_context.`). See [template variables](/voice-agent/template-variables) for the exact syntax in prompts versus webhook payloads.
See [Context & Variables](/core-concepts/context-and-variables) for more on how data flows through a call.

View file

@ -11,7 +11,7 @@ Pre-Call Data Fetch allows you to enrich the call context with external data bef
1. A call arrives (inbound) or is initiated (outbound).
2. Dograh sends a **POST** request to your configured endpoint with a standardized payload.
3. The caller hears a ring-back tone while waiting for the response.
4. Your API responds with a JSON object containing `dynamic_variables`.
4. Your API responds with a JSON object containing an `initial_context` object.
5. The variables are merged into the call's initial context.
6. The voice agent starts with full access to the fetched data via `{{variable_name}}` syntax.
@ -50,12 +50,12 @@ The `Content-Type` header is set to `application/json`. If you configured a cred
## Expected Response Format
Your API should return a **JSON object** with a `2xx` status code. The variables to inject into the call context should be placed inside the `dynamic_variables` key:
Your API should return a **JSON object** with a `2xx` status code. The variables to inject into the call context should be placed inside the `initial_context` key:
```json
{
"call_inbound": {
"dynamic_variables": {
"initial_context": {
"customer_name": "Jane Doe",
"account_status": "active",
"loyalty_tier": "gold",
@ -65,34 +65,38 @@ Your API should return a **JSON object** with a `2xx` status code. The variables
}
```
You can also place `dynamic_variables` at the top level:
You can also place `initial_context` at the top level:
```json
{
"dynamic_variables": {
"initial_context": {
"customer_name": "Jane Doe",
"account_status": "active"
}
}
```
<Note>
The legacy `dynamic_variables` key is still accepted as a drop-in alias for `initial_context`, so existing integrations keep working without any changes. Use `initial_context` for new integrations. If a response contains both keys, `initial_context` takes precedence.
</Note>
After the response is received, you can reference these values anywhere template variables are supported:
- **Greeting**: `Hello {{customer_name}}, thank you for calling!`
- **Prompt**: `The customer is a {{loyalty_tier}} member with {{open_tickets}} open support tickets.`
<Note>
If the response is not a valid JSON object, does not contain `dynamic_variables`, or the request fails or times out, the call proceeds normally without the additional context. The pre-call fetch never blocks or fails a call.
If the response is not a valid JSON object, does not contain `initial_context` (or the legacy `dynamic_variables`), or the request fails or times out, the call proceeds normally without the additional context. The pre-call fetch never blocks or fails a call.
</Note>
## Nested Variables
If your `dynamic_variables` contain nested objects, you can access them using dot notation:
If your `initial_context` contains nested objects, you can access them using dot notation:
```json
{
"call_inbound": {
"dynamic_variables": {
"initial_context": {
"customer": {
"name": "Jane Doe",
"address": {
@ -153,7 +157,7 @@ app.post("/dograh/pre-call", async (req, res) => {
res.json({
call_inbound: {
dynamic_variables: {
initial_context: {
customer_name: customer.name,
account_status: customer.status,
loyalty_tier: customer.tier,

View file

@ -4,13 +4,23 @@ description: "You can use Template Variables in your prompts for your Agent node
---
### Template Rendering
You can reference template variables which is passed as [`initial_context`](/core-concepts/context-and-variables#initial_context) either using the [API Trigger](/voice-agent/api-trigger) or when uploading a Sheet for a [campaign](/core-concepts/campaigns). You can also use any extracted variable as [`gathered_context`](/core-concepts/context-and-variables#gathered_context)
The template rendering can take nested values.
You reference template variables with `{{double_brace}}` syntax. The data comes from [`initial_context`](/core-concepts/context-and-variables#initial_context) — set via the [API Trigger](/voice-agent/api-trigger), a [campaign](/core-concepts/campaigns) sheet, or a [Pre-Call Data Fetch](/voice-agent/pre-call-data-fetch) that enriches the context when the call starts — and, in Webhook payloads only, from [`gathered_context`](/core-concepts/context-and-variables#gathered_context) (variables extracted during the call).
Example: If the initial context is
**The syntax depends on where you use it:**
```
| Where | `initial_context` | `gathered_context` |
| --- | --- | --- |
| Agent node prompts | `{{field_name}}` (referenced directly) | Not available |
| Webhook Node payloads | `{{initial_context.field_name}}` | `{{gathered_context.field_name}}` |
#### Agent node prompts
In an Agent node prompt, reference each `initial_context` field **directly by name**. Nested values are supported with dot notation.
Example: if the initial context is
```json
{
"initial_context": {
"user": {
@ -20,14 +30,26 @@ Example: If the initial context is
}
```
You can write your prompt to access the user's name as below
write your prompt to access the user's name as below:
Prompt: `You are Alice, who is talking to {{initial_context.user.name}}.`
Prompt: `You are Alice, who is talking to {{user.name}}.`
<Note>
Variables extracted during the call (`gathered_context`) are **not** available in Agent prompts — a prompt can only reference `initial_context` fields. To act on extracted data, send it to a [Webhook Node](/voice-agent/webhook).
</Note>
#### Webhook Node payloads
When constructing a [Webhook Node](/voice-agent/webhook) payload, the context objects are nested under their names, so reference them with the `initial_context.` and `gathered_context.` prefixes:
Payload value: `{{initial_context.user.name}}` or `{{gathered_context.call_disposition}}`
### Using Template Variables for Testing
Template variables defined in your workflow **Settings > Context Variables** are included in test calls (both web and phone) made from the workflow editor. This is useful for simulating data that would normally come from telephony or an API trigger.
<img src="../images/template-variables.png" alt="Template Variables panel in workflow Settings, showing a customer_name variable and fields to add new key/value pairs" style={{border: "1px solid #d1d5db", borderRadius: "8px", maxWidth: "100%"}} />
For example, you can set `caller_number` and `called_number` as context variables to test [Pre-Call Data Fetch](/voice-agent/pre-call-data-fetch#testing-with-test-calls) without needing a real inbound call.
<Note>

View file

@ -243,6 +243,7 @@ if ($UseCoturn) {
Write-Info "[2/$TotalSteps] Creating environment file..."
$ossJwtSecret = New-HexSecret 32
$postgresPassword = New-HexSecret 32
$envLines = @(
'# Container registry for Dograh images'
@ -251,6 +252,11 @@ $envLines = @(
'# JWT secret for OSS authentication'
"OSS_JWT_SECRET=$ossJwtSecret"
''
'# PostgreSQL password. Used by the postgres container on first init and by'
"# the API's DATABASE_URL. Do not change after the first start — the password"
'# is baked into the postgres data volume when it is first created.'
"POSTGRES_PASSWORD=$postgresPassword"
''
'# Telemetry (set to false to disable)'
"ENABLE_TELEMETRY=$EnableTelemetry"
''

View file

@ -150,6 +150,7 @@ fi
ENV_STEP=$TOTAL_STEPS
echo -e "${BLUE}[$ENV_STEP/$TOTAL_STEPS] Creating environment file...${NC}"
OSS_JWT_SECRET=$(openssl rand -hex 32)
POSTGRES_PASSWORD=$(openssl rand -hex 32)
cat > .env << ENV_EOF
# Container registry for Dograh images
@ -158,6 +159,11 @@ REGISTRY=$REGISTRY
# JWT secret for OSS authentication
OSS_JWT_SECRET=$OSS_JWT_SECRET
# PostgreSQL password. Used by the postgres container on first init and by the
# API's DATABASE_URL. Do not change after the first start — the password is
# baked into the postgres data volume when it is first created.
POSTGRES_PASSWORD=$POSTGRES_PASSWORD
# Telemetry (set to false to disable)
ENABLE_TELEMETRY=$ENABLE_TELEMETRY

View file

@ -251,6 +251,7 @@ echo -e "${GREEN}✓ SSL certificates generated${NC}"
echo -e "${BLUE}[4/$TOTAL] Creating environment file...${NC}"
OSS_JWT_SECRET=$(openssl rand -hex 32)
POSTGRES_PASSWORD=$(openssl rand -hex 32)
cat > .env << ENV_EOF
# Remote deployments run with production signaling and HTTPS defaults
@ -276,6 +277,11 @@ FORCE_TURN_RELAY=$FORCE_TURN_RELAY
# JWT secret for OSS authentication
OSS_JWT_SECRET=$OSS_JWT_SECRET
# PostgreSQL password. Used by the postgres container on first init and by the
# API's DATABASE_URL. Do not change after the first start — the password is
# baked into the postgres data volume when it is first created.
POSTGRES_PASSWORD=$POSTGRES_PASSWORD
# Telemetry (set to false to disable)
ENABLE_TELEMETRY=$ENABLE_TELEMETRY

View file

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: dograh-openapi-rs5H7P.json
# timestamp: 2026-06-02T06:01:29+00:00
# filename: dograh-openapi-uraOZf.json
# timestamp: 2026-06-03T11:53:30+00:00
from __future__ import annotations

View file

@ -9,11 +9,6 @@ const nextConfig: NextConfig = {
},
async rewrites() {
return [
// API proxy for backend calls (excluding Next.js API routes)
{
source: "/api/:path((?!config|auth).*)*",
destination: `${process.env.BACKEND_URL || 'http://localhost:8000'}/api/:path*`,
},
{
source: "/ingest/static/:path*",
destination: "https://us-assets.i.posthog.com/static/:path*",

View file

@ -1,6 +1,6 @@
{
"name": "ui",
"version": "1.33.0",
"version": "1.34.0",
"private": true,
"scripts": {
"dev": "cross-env NODE_OPTIONS=--enable-source-maps next dev --turbopack",

View file

@ -0,0 +1,104 @@
import { NextRequest, NextResponse } from "next/server";
import { getServerBackendUrl } from "@/lib/apiClient";
export const dynamic = "force-dynamic";
export const runtime = "nodejs";
const HOP_BY_HOP_HEADERS = [
"connection",
"keep-alive",
"proxy-authenticate",
"proxy-authorization",
"te",
"trailer",
"transfer-encoding",
"upgrade",
];
function trimTrailingSlash(url: string) {
return url.endsWith("/") ? url.slice(0, -1) : url;
}
function buildBackendUrl(request: NextRequest) {
const backendUrl = trimTrailingSlash(getServerBackendUrl());
return `${backendUrl}${request.nextUrl.pathname}${request.nextUrl.search}`;
}
function createRequestHeaders(request: NextRequest) {
const headers = new Headers(request.headers);
for (const header of HOP_BY_HOP_HEADERS) {
headers.delete(header);
}
headers.delete("accept-encoding");
headers.delete("content-length");
headers.delete("host");
return headers;
}
function createResponseHeaders(response: Response) {
const headers = new Headers(response.headers);
const setCookies = response.headers.getSetCookie();
for (const header of HOP_BY_HOP_HEADERS) {
headers.delete(header);
}
headers.delete("content-encoding");
headers.delete("content-length");
headers.delete("set-cookie");
for (const cookie of setCookies) {
headers.append("set-cookie", cookie);
}
return headers;
}
async function getRequestBody(request: NextRequest) {
if (request.method === "GET" || request.method === "HEAD") {
return undefined;
}
return request.arrayBuffer();
}
async function proxyRequest(request: NextRequest) {
const backendUrl = buildBackendUrl(request);
try {
const response = await fetch(backendUrl, {
method: request.method,
headers: createRequestHeaders(request),
body: await getRequestBody(request),
cache: "no-store",
});
return new Response(request.method === "HEAD" ? null : response.body, {
status: response.status,
statusText: response.statusText,
headers: createResponseHeaders(response),
});
} catch (error) {
const message =
error instanceof Error ? error.message : "Unknown backend proxy error";
return NextResponse.json(
{
detail: `Backend request failed while proxying to ${backendUrl}: ${message}`,
},
{ status: 502 },
);
}
}
export const GET = proxyRequest;
export const POST = proxyRequest;
export const PUT = proxyRequest;
export const PATCH = proxyRequest;
export const DELETE = proxyRequest;
export const OPTIONS = proxyRequest;
export const HEAD = proxyRequest;

View file

@ -1,5 +1,7 @@
import "server-only";
import { getServerBackendUrl } from "@/lib/apiClient";
let cachedAuthProvider: string | null = null;
/**
@ -12,7 +14,7 @@ export async function getAuthProvider(): Promise<string> {
}
try {
const backendUrl = process.env.BACKEND_URL || "http://localhost:8000";
const backendUrl = getServerBackendUrl();
const res = await fetch(`${backendUrl}/api/v1/health`, {
next: { revalidate: 300 },
});

View file

@ -1,6 +1,8 @@
import type { NextRequest } from 'next/server';
import { NextResponse } from 'next/server';
import { getServerBackendUrl } from '@/lib/apiClient';
const OSS_TOKEN_COOKIE = 'dograh_auth_token';
// Paths that don't require authentication in OSS mode
@ -14,7 +16,7 @@ async function fetchAuthProvider(): Promise<string> {
}
try {
const backendUrl = process.env.BACKEND_URL || 'http://localhost:8000';
const backendUrl = getServerBackendUrl();
const res = await fetch(`${backendUrl}/api/v1/health`);
if (res.ok) {
const data = await res.json();