feat(webhooks): durable retrying delivery for final webhooks (#478)

* feat(webhooks): durable retrying delivery for final webhooks

Final webhook nodes were fired inline with a single best-effort httpx POST
(run_integrations._execute_webhook_node). On a transient error the failure was
swallowed at three levels, so ARQ never retried and the final call report was
permanently lost -- leaving downstream receivers stuck (e.g. a CRM showing a
call as still "in conversation").

Replace the one-shot POST with a durable, idempotent delivery pipeline modelled
on the campaign retry pattern (persisted row + scheduled_for + bounded attempts):

- New webhook_deliveries table (WebhookDeliveryModel) is the source of truth.
  Payload is rendered once and frozen so retries are deterministic; secrets are
  not stored -- the credential is referenced by uuid and re-resolved at send time.
- run_integrations now persists a delivery row and enqueues deliver_webhook with
  a deterministic ARQ job id instead of sending inline.
- deliver_webhook (new ARQ task) sends the request and:
    * 2xx            -> succeeded
    * transient      -> retry with capped exponential backoff (RequestError /
                        5xx / 408 / 425 / 429), up to max_attempts then dead_letter
    * permanent 4xx  -> dead_letter immediately (no pointless looping)
  It is idempotent: a non-pending delivery is a no-op, so a duplicate enqueue or
  sweeper re-injection can't double-send.
- sweep_webhook_deliveries cron (every 5 min) re-enqueues overdue pending
  deliveries so nothing is lost to a worker restart / Redis flush.
- Stable X-Dograh-Delivery-Id / Workflow-Run-Id / Attempt headers let receivers
  dedupe retried deliveries.
- enqueue_job now forwards ARQ job options (_job_id, _defer_by); failures log
  repr(e) so empty-message errors like ConnectTimeout are diagnosable.

Config via DEFAULT_WEBHOOK_DELIVERY_CONFIG (env-overridable): max_attempts=5,
base_delay=30s, max_delay=600s, timeout=30s.

Tests cover payload rendering, persist+enqueue, success, transient retry,
retryable 5xx, permanent 4xx dead-letter, attempt exhaustion, and idempotency.
Migration verified to apply/rollback against Postgres; table/enum/indexes confirmed.

* fix(webhooks): atomic claim, safe success-recording, sweep paging, migration cleanup

Address review feedback on the webhook delivery pipeline:

- deliver_webhook now atomically claims a delivery (conditional UPDATE that
  leases scheduled_for) before sending, so concurrent ARQ executions can't
  double-send (the prior status=='pending' read was non-atomic).
- Recording success is moved out of the dead-letter try-block: if the receiver
  accepted the payload (2xx) but the success DB-write fails, the row is left
  pending for the sweeper to reconcile instead of being dead-lettered.
- The sweep keyset-paginates by id so a backlog over the page size is fully
  drained, and logs the true re-enqueued total.
- Migration downgrade drops the enum via op.execute(DROP TYPE IF EXISTS ...)
  instead of the deprecated op.get_bind().

* fix(webhooks): idempotent delivery creation and drop secret custom headers

Address the remaining review feedback:

- Add a (workflow_run_id, webhook_node_id) unique constraint and make
  create_webhook_delivery a get-or-create returning (delivery, created). A
  retried run_integrations now reuses the existing row instead of creating and
  sending a duplicate final webhook; only a freshly-created row is enqueued.
- Stop persisting secret-looking custom headers (Authorization, X-API-Key,
  Cookie, ...) in plaintext on the delivery row: they are dropped with a warning
  pointing at the credential store (which is re-resolved securely at send time).
  Non-secret custom headers are unaffected.

* fix(webhooks): harden idempotency key, secret-header match, sweep reclaim id

Address follow-up review feedback:

- webhook_node_id is now NOT NULL so a NULL can't slip past the
  (workflow_run_id, webhook_node_id) unique constraint and create duplicates.
- Secret-header filtering matches normalized markers (auth/token/secret/cookie/
  api-key/...) instead of an exact name list, catching variants like
  X-Custom-Auth-Token while leaving benign headers (e.g. X-Idempotency-Key).
- The sweeper re-enqueues with a reclaim-specific job id (the lease timestamp)
  so reconciling a delivered-but-unrecorded row isn't deduped against the
  original attempt's already-completed ARQ job. The atomic claim still ensures
  at most one send.

* fix(webhooks): scope delivery rows to workflow org

---------

Co-authored-by: Abhishek Kumar <abhishek@a6k.me>
This commit is contained in:
Tararais 2026-07-02 17:14:14 +01:00 committed by GitHub
parent 3a770a6538
commit fd0d144b08
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 1360 additions and 141 deletions

View file

@ -0,0 +1,119 @@
"""add webhook_deliveries
Durable, retrying outbound webhook delivery so a transient network error can't
permanently drop a workflow's final webhook.
Revision ID: b7e3c9a1d2f4
Revises: 91cc6ba3e1c7
Create Date: 2026-06-28 19:40:00.000000
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "b7e3c9a1d2f4"
down_revision: Union[str, None] = "91cc6ba3e1c7"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
"webhook_deliveries",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("delivery_uuid", sa.String(length=36), nullable=False),
sa.Column("workflow_run_id", sa.Integer(), nullable=False),
sa.Column("organization_id", sa.Integer(), nullable=False),
sa.Column("webhook_name", sa.String(), nullable=True),
sa.Column("endpoint_url", sa.String(), nullable=False),
sa.Column(
"http_method",
sa.String(),
nullable=False,
),
sa.Column(
"payload",
sa.JSON(),
nullable=False,
),
sa.Column("custom_headers", sa.JSON(), nullable=True),
sa.Column("credential_uuid", sa.String(length=36), nullable=True),
sa.Column("webhook_node_id", sa.String(), nullable=False),
sa.Column(
"status",
sa.Enum(
"pending",
"succeeded",
"dead_letter",
name="webhook_delivery_status",
),
server_default=sa.text("'pending'"),
nullable=False,
),
sa.Column(
"attempt_count",
sa.Integer(),
server_default=sa.text("0"),
nullable=False,
),
sa.Column(
"max_attempts",
sa.Integer(),
server_default=sa.text("5"),
nullable=False,
),
sa.Column("scheduled_for", sa.DateTime(timezone=True), nullable=True),
sa.Column("last_status_code", sa.Integer(), nullable=True),
sa.Column("last_error", sa.Text(), nullable=True),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("updated_at", sa.DateTime(timezone=True), nullable=True),
sa.ForeignKeyConstraint(
["workflow_run_id"], ["workflow_runs.id"], ondelete="CASCADE"
),
sa.ForeignKeyConstraint(
["organization_id"], ["organizations.id"], ondelete="CASCADE"
),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint(
"workflow_run_id",
"webhook_node_id",
name="uq_webhook_deliveries_run_node",
),
)
op.create_index(
"ix_webhook_deliveries_delivery_uuid",
"webhook_deliveries",
["delivery_uuid"],
unique=True,
)
op.create_index(
"idx_webhook_deliveries_run",
"webhook_deliveries",
["workflow_run_id"],
unique=False,
)
# Partial index for the sweeper's hot path: due pending deliveries.
op.create_index(
"idx_webhook_deliveries_pending_scheduled",
"webhook_deliveries",
["scheduled_for"],
unique=False,
postgresql_where=sa.text("status = 'pending'"),
)
def downgrade() -> None:
op.drop_index(
"idx_webhook_deliveries_pending_scheduled",
table_name="webhook_deliveries",
)
op.drop_index("idx_webhook_deliveries_run", table_name="webhook_deliveries")
op.drop_index(
"ix_webhook_deliveries_delivery_uuid", table_name="webhook_deliveries"
)
op.drop_table("webhook_deliveries")
op.execute("DROP TYPE IF EXISTS webhook_delivery_status")

View file

@ -161,6 +161,18 @@ DEFAULT_CAMPAIGN_RETRY_CONFIG = {
}
# Outbound webhook delivery: bounded retry with exponential backoff.
# Delivery is persisted (see WebhookDeliveryModel) and retried by an ARQ task so a
# transient network error can't permanently drop a final webhook. After
# ``max_attempts`` transient failures the delivery is parked as ``dead_letter``.
DEFAULT_WEBHOOK_DELIVERY_CONFIG = {
"max_attempts": int(os.getenv("WEBHOOK_DELIVERY_MAX_ATTEMPTS", 5)),
"base_delay_seconds": int(os.getenv("WEBHOOK_DELIVERY_BASE_DELAY_SECONDS", 30)),
"max_delay_seconds": int(os.getenv("WEBHOOK_DELIVERY_MAX_DELAY_SECONDS", 600)),
"timeout_seconds": int(os.getenv("WEBHOOK_DELIVERY_TIMEOUT_SECONDS", 30)),
}
# Circuit breaker defaults for campaign call failure detection
DEFAULT_CIRCUIT_BREAKER_CONFIG = {
"enabled": True,

View file

@ -14,6 +14,7 @@ from api.db.telephony_phone_number_client import TelephonyPhoneNumberClient
from api.db.tool_client import ToolClient
from api.db.user_client import UserClient
from api.db.webhook_credential_client import WebhookCredentialClient
from api.db.webhook_delivery_client import WebhookDeliveryClient
from api.db.workflow_client import WorkflowClient
from api.db.workflow_recording_client import WorkflowRecordingClient
from api.db.workflow_run_client import WorkflowRunClient
@ -37,6 +38,7 @@ class DBClient(
EmbedTokenClient,
AgentTriggerClient,
WebhookCredentialClient,
WebhookDeliveryClient,
ToolClient,
KnowledgeBaseClient,
WorkflowRecordingClient,
@ -62,6 +64,7 @@ class DBClient(
- EmbedTokenClient: handles embed token and session operations
- AgentTriggerClient: handles agent trigger operations for API-based call triggering
- WebhookCredentialClient: handles webhook credential operations
- WebhookDeliveryClient: handles durable outbound webhook delivery records
- ToolClient: handles tool operations for reusable HTTP API tools
- KnowledgeBaseClient: handles knowledge base document and vector search operations
- FolderClient: handles folder operations for grouping workflows (agents)

View file

@ -1022,6 +1022,103 @@ class ExternalCredentialModel(Base):
)
class WebhookDeliveryModel(Base):
"""Durable record of an outbound webhook delivery attempt.
Final webhooks (e.g. a workflow's "Final Webhook" node) must not be lost to a
single transient network error. Instead of firing the HTTP request inline and
forgetting it, we persist one row per webhook node per workflow run and let an
ARQ task drive delivery with bounded, backed-off retries. The row is the source
of truth: it survives worker restarts and a periodic sweeper re-enqueues any
``pending`` delivery whose ``scheduled_for`` is overdue. After ``max_attempts``
transient failures (or on a permanent 4xx) the row is parked as ``dead_letter``
for inspection rather than retried forever.
Mirrors the campaign retry pattern (``QueuedRunModel``): persisted state,
``scheduled_for`` gating, a hard attempt ceiling, and a terminal failure state.
"""
__tablename__ = "webhook_deliveries"
id = Column(Integer, primary_key=True, index=True)
# Stable idempotency key sent to the receiver so it can dedupe retries.
delivery_uuid = Column(
String(36),
unique=True,
nullable=False,
index=True,
default=lambda: str(uuid.uuid4()),
)
workflow_run_id = Column(
Integer,
ForeignKey("workflow_runs.id", ondelete="CASCADE"),
nullable=False,
)
organization_id = Column(
Integer, ForeignKey("organizations.id", ondelete="CASCADE"), nullable=False
)
# Frozen request definition. The payload is rendered once at enqueue time so
# retries are deterministic. Secrets are NOT stored here: the auth header is
# re-resolved from ``credential_uuid`` at send time (honours rotation/revocation).
webhook_name = Column(String, nullable=True)
endpoint_url = Column(String, nullable=False)
http_method = Column(String, nullable=False, default="POST")
payload = Column(JSON, nullable=False, default=dict)
custom_headers = Column(JSON, nullable=True)
credential_uuid = Column(String(36), nullable=True)
# Workflow node that produced this delivery. Combined with workflow_run_id it
# is the per-run/per-node idempotency key, so a retried run_integrations does
# not create (and send) a duplicate delivery for the same node. Non-nullable:
# a NULL would be distinct under the unique constraint and defeat the dedupe.
webhook_node_id = Column(String, nullable=False)
status = Column(
Enum(
"pending",
"succeeded",
"dead_letter",
name="webhook_delivery_status",
),
nullable=False,
default="pending",
server_default="pending",
)
attempt_count = Column(Integer, nullable=False, default=0, server_default=text("0"))
max_attempts = Column(Integer, nullable=False, default=5, server_default=text("5"))
# When the next attempt becomes due. NULL once terminal (succeeded/dead_letter).
scheduled_for = Column(DateTime(timezone=True), nullable=True)
last_status_code = Column(Integer, nullable=True)
last_error = Column(Text, nullable=True)
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(UTC))
updated_at = Column(
DateTime(timezone=True),
default=lambda: datetime.now(UTC),
onupdate=lambda: datetime.now(UTC),
)
__table_args__ = (
# Sweeper lookup: due pending deliveries.
Index(
"idx_webhook_deliveries_pending_scheduled",
"scheduled_for",
postgresql_where=text("status = 'pending'"),
),
Index("idx_webhook_deliveries_run", "workflow_run_id"),
# Per-run/per-node idempotency: one delivery per webhook node per run.
UniqueConstraint(
"workflow_run_id",
"webhook_node_id",
name="uq_webhook_deliveries_run_node",
),
)
class ToolModel(Base):
"""Model for storing reusable tools that can be invoked during workflows.

View file

@ -0,0 +1,244 @@
"""Database client for durable outbound webhook deliveries.
Persists one row per webhook node per workflow run and exposes the state
transitions the delivery task and sweeper need: create (pending), succeed,
schedule the next retry, and park as dead-letter. Mirrors the campaign retry
pattern -- the row is the source of truth, ``scheduled_for`` gates due work.
"""
from datetime import UTC, datetime, timedelta
from typing import List, Optional, Tuple
from loguru import logger
from sqlalchemy import or_, select, update
from sqlalchemy.exc import IntegrityError
from api.db.base_client import BaseDBClient
from api.db.models import WebhookDeliveryModel, WorkflowModel, WorkflowRunModel
class WebhookDeliveryClient(BaseDBClient):
"""Client for managing persisted webhook delivery records."""
async def create_webhook_delivery(
self,
workflow_run_id: int,
organization_id: int,
endpoint_url: str,
payload: dict,
max_attempts: int,
http_method: str = "POST",
webhook_name: Optional[str] = None,
custom_headers: Optional[list] = None,
credential_uuid: Optional[str] = None,
webhook_node_id: Optional[str] = None,
scheduled_for: Optional[datetime] = None,
) -> Tuple[WebhookDeliveryModel, bool]:
"""Get-or-create the ``pending`` delivery for this run + webhook node.
Idempotent on ``(workflow_run_id, webhook_node_id)``: a retried
``run_integrations`` returns the existing row instead of creating (and
sending) a duplicate. Returns ``(delivery, created)`` so the caller only
enqueues a send for a freshly-created row.
"""
async with self.async_session() as session:
run_scope_result = await session.execute(
select(WorkflowRunModel.id, WorkflowModel.organization_id)
.join(WorkflowModel, WorkflowRunModel.workflow_id == WorkflowModel.id)
.where(WorkflowRunModel.id == workflow_run_id)
)
run_scope = run_scope_result.one_or_none()
if run_scope is None:
raise ValueError(f"Workflow run {workflow_run_id} not found")
_, run_organization_id = run_scope
if run_organization_id is None:
raise ValueError(
f"Workflow run {workflow_run_id} is not associated with an organization"
)
if run_organization_id != organization_id:
raise ValueError(
f"Workflow run {workflow_run_id} belongs to organization "
f"{run_organization_id}, not {organization_id}"
)
delivery = WebhookDeliveryModel(
workflow_run_id=workflow_run_id,
organization_id=organization_id,
webhook_name=webhook_name,
webhook_node_id=webhook_node_id,
endpoint_url=endpoint_url,
http_method=http_method,
payload=payload,
custom_headers=custom_headers,
credential_uuid=credential_uuid,
max_attempts=max_attempts,
status="pending",
attempt_count=0,
scheduled_for=scheduled_for or datetime.now(UTC),
)
session.add(delivery)
try:
await session.commit()
except IntegrityError:
await session.rollback()
existing = await session.execute(
select(WebhookDeliveryModel).where(
WebhookDeliveryModel.workflow_run_id == workflow_run_id,
WebhookDeliveryModel.webhook_node_id == webhook_node_id,
)
)
row = existing.scalar_one_or_none()
if row is not None:
return row, False
# The violation was not the run+node uniqueness -- re-raise.
raise
await session.refresh(delivery)
return delivery, True
async def get_webhook_delivery(
self, delivery_id: int
) -> Optional[WebhookDeliveryModel]:
async with self.async_session() as session:
result = await session.execute(
select(WebhookDeliveryModel).where(
WebhookDeliveryModel.id == delivery_id
)
)
return result.scalar_one_or_none()
async def claim_webhook_delivery(
self, delivery_id: int, lease_seconds: int
) -> Optional[WebhookDeliveryModel]:
"""Atomically claim a pending, due delivery for one worker to process.
A conditional UPDATE pushes ``scheduled_for`` out by a short lease. Only
one concurrent worker can win -- the others re-evaluate the WHERE after
the first commits, see the future ``scheduled_for``, match nothing, and
get ``None``. This prevents the non-atomic ``status == 'pending'`` read
from letting two workers double-send the same delivery. If the winning
worker crashes mid-send, the lease expires and the sweeper re-enqueues it.
Returns the claimed row, or ``None`` if it was not claimable (already
claimed, not pending, or not yet due).
"""
now = datetime.now(UTC)
lease_until = now + timedelta(seconds=lease_seconds)
async with self.async_session() as session:
result = await session.execute(
update(WebhookDeliveryModel)
.where(
WebhookDeliveryModel.id == delivery_id,
WebhookDeliveryModel.status == "pending",
or_(
WebhookDeliveryModel.scheduled_for.is_(None),
WebhookDeliveryModel.scheduled_for <= now,
),
)
.values(scheduled_for=lease_until, updated_at=now)
)
await session.commit()
if result.rowcount == 0:
return None
fetched = await session.execute(
select(WebhookDeliveryModel).where(
WebhookDeliveryModel.id == delivery_id
)
)
return fetched.scalar_one_or_none()
async def mark_webhook_delivery_succeeded(
self, delivery_id: int, attempt_count: int, status_code: Optional[int]
) -> None:
async with self.async_session() as session:
await session.execute(
update(WebhookDeliveryModel)
.where(WebhookDeliveryModel.id == delivery_id)
.values(
status="succeeded",
attempt_count=attempt_count,
last_status_code=status_code,
last_error=None,
scheduled_for=None,
updated_at=datetime.now(UTC),
)
)
await session.commit()
async def schedule_webhook_delivery_retry(
self,
delivery_id: int,
attempt_count: int,
scheduled_for: datetime,
last_error: str,
last_status_code: Optional[int],
) -> None:
"""Record a transient failure and set when the next attempt is due."""
async with self.async_session() as session:
await session.execute(
update(WebhookDeliveryModel)
.where(WebhookDeliveryModel.id == delivery_id)
.values(
status="pending",
attempt_count=attempt_count,
scheduled_for=scheduled_for,
last_error=last_error[:2000] if last_error else last_error,
last_status_code=last_status_code,
updated_at=datetime.now(UTC),
)
)
await session.commit()
async def mark_webhook_delivery_dead_letter(
self,
delivery_id: int,
attempt_count: int,
last_error: str,
last_status_code: Optional[int],
) -> None:
"""Terminal failure: parked for inspection, never retried again."""
async with self.async_session() as session:
await session.execute(
update(WebhookDeliveryModel)
.where(WebhookDeliveryModel.id == delivery_id)
.values(
status="dead_letter",
attempt_count=attempt_count,
last_error=last_error[:2000] if last_error else last_error,
last_status_code=last_status_code,
scheduled_for=None,
updated_at=datetime.now(UTC),
)
)
await session.commit()
logger.warning(
f"Webhook delivery {delivery_id} dead-lettered after "
f"{attempt_count} attempts: {last_error}"
)
async def get_due_webhook_deliveries(
self, now: Optional[datetime] = None, limit: int = 100, after_id: int = 0
) -> List[WebhookDeliveryModel]:
"""One page of pending deliveries whose next attempt is due.
Used by the periodic sweeper to re-enqueue deliveries whose ARQ job was
lost (worker restart, Redis flush). The delivery task is idempotent, so a
spurious re-enqueue is harmless. Ordered by ``id`` and gated on
``after_id`` for keyset pagination -- re-enqueuing does not change a row's
due state, so the sweeper pages by id to drain the whole backlog instead
of re-reading the same first page forever.
"""
cutoff = now or datetime.now(UTC)
async with self.async_session() as session:
result = await session.execute(
select(WebhookDeliveryModel)
.where(
WebhookDeliveryModel.status == "pending",
WebhookDeliveryModel.scheduled_for.isnot(None),
WebhookDeliveryModel.scheduled_for <= cutoff,
WebhookDeliveryModel.id > after_id,
)
.order_by(WebhookDeliveryModel.id)
.limit(limit)
)
return list(result.scalars().all())

View file

@ -438,10 +438,10 @@ class WorkflowRunClient(BaseDBClient):
if not workflow_run:
return None, None
if not workflow_run.workflow or not workflow_run.workflow.user:
if not workflow_run.workflow:
return workflow_run, None
organization_id = workflow_run.workflow.user.selected_organization_id
organization_id = workflow_run.workflow.organization_id
return workflow_run, organization_id
async def ensure_public_access_token(self, workflow_run_id: int) -> Optional[str]:

View file

@ -205,6 +205,7 @@ class TransferCallConfig(BaseModel):
description="Maximum seconds to wait for the destination to answer.",
)
class McpToolConfig(BaseModel):
"""Configuration for a customer MCP server tool definition."""

View file

@ -12,7 +12,7 @@ from api.tasks.function_names import FunctionNames
setup_logging()
# Now import ARQ and task dependencies
from arq import create_pool
from arq import create_pool, cron
from arq.connections import ArqRedis, RedisSettings
parsed_url = urlparse(REDIS_URL)
@ -46,6 +46,7 @@ from api.tasks.campaign_tasks import (
from api.tasks.knowledge_base_processing import process_knowledge_base_document
from api.tasks.run_integrations import run_integrations_post_workflow_run
from api.tasks.s3_upload import upload_voicemail_audio_to_s3
from api.tasks.webhook_delivery import deliver_webhook, sweep_webhook_deliveries
from api.tasks.workflow_completion import process_workflow_completion
@ -57,8 +58,18 @@ class WorkerSettings:
sync_campaign_source,
process_campaign_batch,
process_knowledge_base_document,
deliver_webhook,
]
cron_jobs = [
# Safety net for webhook deliveries whose ARQ job was lost (worker
# restart / Redis flush): re-enqueue any pending delivery that is overdue.
cron(
sweep_webhook_deliveries,
minute=set(range(0, 60, 5)),
second=0,
run_at_startup=True,
),
]
cron_jobs = []
redis_settings = REDIS_SETTINGS
max_jobs = 10
@ -107,6 +118,8 @@ async def get_arq_redis() -> ArqRedis:
return _redis_pool
async def enqueue_job(function_name: FunctionNames, *args):
async def enqueue_job(function_name: FunctionNames, *args, **kwargs):
redis = await get_arq_redis()
await redis.enqueue_job(function_name, *args)
# kwargs forwards ARQ job options (e.g. _job_id, _defer_by) used for
# deterministic, backed-off webhook delivery retries.
return await redis.enqueue_job(function_name, *args, **kwargs)

View file

@ -5,3 +5,4 @@ class FunctionNames:
SYNC_CAMPAIGN_SOURCE = "sync_campaign_source"
PROCESS_CAMPAIGN_BATCH = "process_campaign_batch"
PROCESS_KNOWLEDGE_BASE_DOCUMENT = "process_knowledge_base_document"
DELIVER_WEBHOOK = "deliver_webhook"

View file

@ -1,15 +1,15 @@
"""Execute integrations (QA analysis, webhooks) after workflow run completion."""
import random
from datetime import UTC, datetime
from typing import Any, Dict, Optional
import httpx
from loguru import logger
from pipecat.utils.enums import EndTaskReason
from pipecat.utils.run_context import set_current_org_id, set_current_run_id
from pydantic import ValidationError
from api.constants import BACKEND_API_ENDPOINT
from api.constants import BACKEND_API_ENDPOINT, DEFAULT_WEBHOOK_DELIVERY_CONFIG
from api.db import db_client
from api.db.models import WorkflowRunModel
from api.enums import OrganizationConfigurationKey
@ -26,7 +26,7 @@ from api.services.workflow.dto import (
WebhookRFNode,
)
from api.services.workflow.qa import run_per_node_qa_analysis
from api.utils.credential_auth import build_auth_header
from api.tasks.function_names import FunctionNames
from api.utils.recording_artifacts import get_recording_storage_key
from api.utils.template_renderer import render_template
@ -315,13 +315,15 @@ async def run_integrations_post_workflow_run(_ctx, workflow_run_id: int):
webhook_data = webhook_node.data
try:
await _execute_webhook_node(
await _enqueue_webhook_delivery(
webhook_data=webhook_data,
render_context=render_context,
organization_id=organization_id,
workflow_run_id=workflow_run_id,
webhook_node_id=str(node_id),
)
except Exception as e:
logger.warning(f"Failed to execute webhook '{webhook_data.name}': {e}")
logger.warning(f"Failed to enqueue webhook '{webhook_data.name}': {e}")
except Exception as e:
logger.error(f"Error running integrations: {e}", exc_info=True)
@ -387,98 +389,140 @@ def _build_render_context(
return context
async def _execute_webhook_node(
webhook_data: WebhookNodeData,
render_context: Dict[str, Any],
organization_id: int,
) -> bool:
def _build_webhook_payload(
webhook_data: WebhookNodeData, render_context: Dict[str, Any]
) -> Any:
"""Render the webhook payload once, so retries are deterministic.
Always surfaces the call disposition on the outgoing payload, even when the
template author didn't reference it. Fill only if absent so a template that
sets it explicitly keeps its own value.
"""
Execute a single webhook node.
Args:
webhook_data: The validated webhook node data
render_context: Context for template rendering
organization_id: For credential lookup
Returns:
True if successful, False otherwise
"""
webhook_name = webhook_data.name
if not webhook_data.enabled:
logger.debug(f"Webhook '{webhook_name}' is disabled, skipping")
return True
url = webhook_data.endpoint_url
if not url:
logger.warning(f"Webhook '{webhook_name}' has no endpoint URL")
return False
headers = {"Content-Type": "application/json"}
credential_uuid = webhook_data.credential_uuid
if credential_uuid:
credential = await db_client.get_credential_by_uuid(
credential_uuid, organization_id
)
if credential:
auth_header = build_auth_header(credential)
headers.update(auth_header)
logger.debug(f"Applied credential '{credential.name}' to webhook")
else:
logger.warning(
f"Credential {credential_uuid} not found for webhook '{webhook_name}'"
)
for h in webhook_data.custom_headers or []:
if h.key and h.value:
headers[h.key] = h.value
payload = render_template(webhook_data.payload_template or {}, render_context)
# Always surface the call disposition on the outgoing payload, even when the
# template author didn't reference it. Fill only if absent so a template that
# sets it explicitly keeps its own value.
if isinstance(payload, dict):
gathered_context = render_context.get("gathered_context") or {}
payload.setdefault(
"call_disposition", gathered_context.get("call_disposition", "")
)
return payload
# Substrings that mark a header as likely carrying a secret. Matched against the
# normalized key so variants are caught too (e.g. ``X-Custom-Auth-Token``,
# ``My-Api-Key``), not just exact names. Their values are NOT persisted on the
# delivery row (which would store them in plaintext); secrets belong in the
# credential store, re-resolved at send time. Bare "key" is intentionally absent
# to avoid dropping benign headers like ``X-Idempotency-Key``.
_SECRET_HEADER_MARKERS = (
"authorization",
"auth",
"token",
"secret",
"password",
"passwd",
"cookie",
"credential",
"api-key",
"apikey",
"api_key",
"access-key",
)
def _looks_like_secret_header(key: str) -> bool:
normalized = key.strip().lower()
return any(marker in normalized for marker in _SECRET_HEADER_MARKERS)
def _safe_custom_headers(
webhook_data: WebhookNodeData, webhook_name: str
) -> list[dict]:
"""Custom headers to persist, with secret-looking ones dropped.
Persisting arbitrary header values would store credentials (Authorization,
X-API-Key, ...) in plaintext on the delivery row. Drop those and tell the
operator to use a credential instead.
"""
safe = []
for h in webhook_data.custom_headers or []:
if not (h.key and h.value):
continue
if _looks_like_secret_header(h.key):
logger.warning(
f"Webhook '{webhook_name}' custom header '{h.key}' looks like a "
f"secret; it will not be stored or sent. Use a credential instead."
)
continue
safe.append({"key": h.key, "value": h.value})
return safe
async def _enqueue_webhook_delivery(
webhook_data: WebhookNodeData,
render_context: Dict[str, Any],
organization_id: int,
workflow_run_id: int,
webhook_node_id: str,
) -> None:
"""Persist a durable delivery record and enqueue its first send attempt.
The actual HTTP request is performed by the ``deliver_webhook`` task, which
retries transient failures with backoff and dead-letters exhausted/permanent
ones. This replaces the previous one-shot, best-effort inline POST that lost
the webhook entirely on a single network error.
Idempotent on ``(workflow_run_id, webhook_node_id)``: a retried run reuses the
existing delivery row and does not enqueue a second send.
"""
webhook_name = webhook_data.name
if not webhook_data.enabled:
logger.debug(f"Webhook '{webhook_name}' is disabled, skipping")
return
url = webhook_data.endpoint_url
if not url:
logger.warning(f"Webhook '{webhook_name}' has no endpoint URL")
return
payload = _build_webhook_payload(webhook_data, render_context)
# Persist non-secret request definition. The credential is stored by reference
# (uuid) and re-resolved at send time so secrets never land in this row.
custom_headers = _safe_custom_headers(webhook_data, webhook_name)
method = (webhook_data.http_method or "POST").upper()
logger.info(f"Executing webhook '{webhook_name}': {method}")
delivery, created = await db_client.create_webhook_delivery(
workflow_run_id=workflow_run_id,
organization_id=organization_id,
endpoint_url=url,
payload=payload,
max_attempts=DEFAULT_WEBHOOK_DELIVERY_CONFIG["max_attempts"],
http_method=method,
webhook_name=webhook_name,
custom_headers=custom_headers or None,
credential_uuid=webhook_data.credential_uuid,
webhook_node_id=webhook_node_id,
)
try:
async with httpx.AsyncClient() as client:
if method in ("POST", "PUT", "PATCH"):
response = await client.request(
method=method,
url=url,
json=payload,
headers=headers,
timeout=30.0,
)
else: # GET, DELETE
response = await client.request(
method=method,
url=url,
headers=headers,
timeout=30.0,
)
response.raise_for_status()
logger.info(f"Webhook '{webhook_name}' succeeded: {response.status_code}")
return True
except httpx.HTTPStatusError as e:
logger.warning(
f"Webhook '{webhook_name}' failed: {e.response.status_code} - {e.response.text[:200]}"
if not created:
logger.info(
f"Webhook '{webhook_name}' delivery already exists for run "
f"{workflow_run_id} node {webhook_node_id}; not re-enqueuing"
)
return False
except httpx.RequestError as e:
logger.warning(f"Webhook '{webhook_name}' request error: {e}")
return False
except Exception as e:
logger.error(f"Webhook '{webhook_name}' unexpected error: {e}")
return False
return
# Lazy import avoids a circular import (arq imports this module at load time).
from api.tasks.arq import enqueue_job
await enqueue_job(
FunctionNames.DELIVER_WEBHOOK,
delivery.id,
_job_id=f"webhook-delivery-{delivery.id}-0",
)
logger.info(
f"Enqueued webhook '{webhook_name}' delivery {delivery.delivery_uuid} "
f"for run {workflow_run_id}"
)

View file

@ -0,0 +1,266 @@
"""Durable, retrying delivery of outbound webhooks.
A workflow's final webhook must survive a transient network error. Rather than
firing the HTTP POST inline and forgetting it, ``run_integrations`` persists a
``WebhookDeliveryModel`` row and enqueues :func:`deliver_webhook`. This task sends
the request and, on a *transient* failure, schedules the next attempt with
exponential backoff -- up to ``max_attempts``, after which the delivery is parked
as ``dead_letter`` for inspection. Permanent failures (most 4xx) dead-letter
immediately instead of looping.
A periodic :func:`sweep_webhook_deliveries` cron re-enqueues any ``pending``
delivery whose attempt is overdue, so deliveries survive worker restarts / lost
ARQ jobs. The DB row is the source of truth; this task is idempotent and only
acts on a delivery that is still ``pending``.
"""
from datetime import UTC, datetime, timedelta
from typing import Optional
import httpx
from loguru import logger
from pipecat.utils.run_context import set_current_run_id
from api.constants import DEFAULT_WEBHOOK_DELIVERY_CONFIG
from api.db import db_client
from api.db.models import WebhookDeliveryModel
from api.tasks.function_names import FunctionNames
from api.utils.credential_auth import build_auth_header
# HTTP statuses that are worth retrying even though the server answered.
_RETRYABLE_STATUS_CODES = {408, 425, 429, 500, 502, 503, 504}
def _delivery_job_id(delivery_id: int, attempt_count: int) -> str:
"""Deterministic ARQ job id so duplicate enqueues (task re-enqueue + sweeper)
collapse to one job instead of double-sending."""
return f"webhook-delivery-{delivery_id}-{attempt_count}"
def _backoff_seconds(attempt: int) -> int:
"""Exponential backoff (capped) for the next attempt after `attempt` failures."""
base = DEFAULT_WEBHOOK_DELIVERY_CONFIG["base_delay_seconds"]
cap = DEFAULT_WEBHOOK_DELIVERY_CONFIG["max_delay_seconds"]
return min(base * (2 ** (attempt - 1)), cap)
async def _enqueue_delivery(
delivery_id: int,
attempt_count: int,
defer_by: int = 0,
reclaim_token: Optional[int] = None,
):
"""Enqueue a delivery attempt with a dedup-safe job id.
The normal (task self-retry) path uses a deterministic id so a retry and a
sweeper pass for the *same* attempt collapse to one job. The sweeper passes a
``reclaim_token`` (the lease timestamp) to get a distinct id, so reconciling a
delivered-but-unrecorded row is not deduped against the original attempt's
already-completed job. The atomic claim still guarantees at most one send.
"""
from api.tasks.arq import enqueue_job # lazy import avoids circular import
if reclaim_token is not None:
job_id = f"webhook-delivery-reclaim-{delivery_id}-{reclaim_token}"
else:
job_id = _delivery_job_id(delivery_id, attempt_count)
await enqueue_job(
FunctionNames.DELIVER_WEBHOOK,
delivery_id,
_job_id=job_id,
_defer_by=defer_by,
)
async def _build_headers(delivery: WebhookDeliveryModel, attempt: int) -> dict:
"""Assemble request headers, re-resolving credential auth at send time so
secrets are never persisted on the delivery row and rotation is honoured."""
headers = {"Content-Type": "application/json"}
if delivery.credential_uuid:
credential = await db_client.get_credential_by_uuid(
delivery.credential_uuid, delivery.organization_id
)
if credential:
headers.update(build_auth_header(credential))
else:
logger.warning(
f"Credential {delivery.credential_uuid} not found for webhook "
f"'{delivery.webhook_name}' (delivery {delivery.id})"
)
for h in delivery.custom_headers or []:
key, value = h.get("key"), h.get("value")
if key and value:
headers[key] = value
# Stable idempotency signal so the receiver can dedupe retried deliveries.
headers["X-Dograh-Delivery-Id"] = delivery.delivery_uuid
headers["X-Dograh-Workflow-Run-Id"] = str(delivery.workflow_run_id)
headers["X-Dograh-Delivery-Attempt"] = str(attempt)
return headers
async def _handle_transient_failure(
delivery: WebhookDeliveryModel,
attempt: int,
error: str,
status_code: Optional[int],
) -> None:
"""Schedule a backed-off retry, or dead-letter once attempts are exhausted."""
if attempt >= delivery.max_attempts:
await db_client.mark_webhook_delivery_dead_letter(
delivery.id, attempt, error, status_code
)
return
delay = _backoff_seconds(attempt)
scheduled_for = datetime.now(UTC) + timedelta(seconds=delay)
await db_client.schedule_webhook_delivery_retry(
delivery_id=delivery.id,
attempt_count=attempt,
scheduled_for=scheduled_for,
last_error=error,
last_status_code=status_code,
)
await _enqueue_delivery(delivery.id, attempt_count=attempt, defer_by=delay)
logger.warning(
f"Webhook '{delivery.webhook_name}' delivery {delivery.id} attempt {attempt} "
f"failed ({error}); retrying in {delay}s "
f"(attempt {attempt + 1}/{delivery.max_attempts})"
)
async def deliver_webhook(_ctx, delivery_id: int) -> None:
"""Send one webhook delivery attempt and record the outcome.
Concurrency-safe: the delivery is atomically *claimed* before the HTTP
request (a conditional update only one worker can win), so a duplicate
enqueue or sweeper re-injection cannot double-send. A claim that returns
nothing means another worker owns it, or it is no longer pending/due -- a
no-op.
"""
# Lease long enough to outlast a full attempt so the sweeper does not reclaim
# a delivery that is still in flight.
lease_seconds = DEFAULT_WEBHOOK_DELIVERY_CONFIG["timeout_seconds"] + 60
delivery = await db_client.claim_webhook_delivery(delivery_id, lease_seconds)
if delivery is None:
logger.debug(
f"Webhook delivery {delivery_id} not claimable "
f"(already claimed, not pending, or not yet due); skipping"
)
return
set_current_run_id(str(delivery.workflow_run_id))
attempt = delivery.attempt_count + 1
method = (delivery.http_method or "POST").upper()
timeout = DEFAULT_WEBHOOK_DELIVERY_CONFIG["timeout_seconds"]
try:
headers = await _build_headers(delivery, attempt)
async with httpx.AsyncClient() as client:
if method in ("POST", "PUT", "PATCH"):
response = await client.request(
method=method,
url=delivery.endpoint_url,
json=delivery.payload,
headers=headers,
timeout=timeout,
)
else: # GET, DELETE
response = await client.request(
method=method,
url=delivery.endpoint_url,
headers=headers,
timeout=timeout,
)
response.raise_for_status()
except httpx.HTTPStatusError as e:
status_code = e.response.status_code
error = f"HTTP {status_code}: {e.response.text[:200]}"
if status_code in _RETRYABLE_STATUS_CODES:
await _handle_transient_failure(delivery, attempt, error, status_code)
else:
# Permanent (auth/validation/not-found): retrying won't help. Park it.
await db_client.mark_webhook_delivery_dead_letter(
delivery.id, attempt, error, status_code
)
return
except httpx.RequestError as e:
# Connect/read timeouts, DNS, connection resets -- the transient class that
# previously lost the webhook entirely. str(e) is often empty, so use repr.
await _handle_transient_failure(delivery, attempt, repr(e), None)
return
except Exception as e:
# Unexpected (e.g. a bug): don't loop on it, surface as dead-letter.
logger.error(
f"Webhook '{delivery.webhook_name}' delivery {delivery.id} "
f"unexpected error: {e!r}"
)
await db_client.mark_webhook_delivery_dead_letter(
delivery.id, attempt, repr(e), None
)
return
# The receiver accepted the payload (2xx). Recording success must NOT be able
# to dead-letter an already-delivered webhook: if this DB write fails, log and
# leave the row claimed-but-pending so the sweeper reconciles it once the
# lease expires (the receiver dedups the re-send via X-Dograh-Delivery-Id).
try:
await db_client.mark_webhook_delivery_succeeded(
delivery.id, attempt, response.status_code
)
logger.info(
f"Webhook '{delivery.webhook_name}' delivery {delivery.id} succeeded: "
f"{response.status_code} (attempt {attempt})"
)
except Exception as e:
logger.error(
f"Webhook '{delivery.webhook_name}' delivery {delivery.id} was "
f"delivered ({response.status_code}) but recording success failed; "
f"leaving it for the sweeper to reconcile after the lease expires: {e!r}"
)
async def sweep_webhook_deliveries(_ctx) -> None:
"""Safety net: re-enqueue pending deliveries whose attempt is overdue.
Handles ARQ jobs lost to a worker restart or Redis flush. Re-enqueuing uses the
same deterministic job id, so if the original deferred job still exists this is a
no-op; it only re-injects genuinely lost work. ``deliver_webhook`` is idempotent.
"""
page_size = 100
after_id = 0
total = 0
while True:
# Re-enqueuing does not change a row's due state, so we cannot page by
# re-querying the first rows (we'd loop on the same page). Page by id
# instead to drain the whole backlog -- e.g. after a prolonged outage.
due = await db_client.get_due_webhook_deliveries(
now=datetime.now(UTC), limit=page_size, after_id=after_id
)
if not due:
break
for delivery in due:
# A reclaim token (the current lease timestamp) gives this a fresh job
# id so it is not deduped against the original attempt's completed job
# -- otherwise a delivered-but-unrecorded row could sit until ARQ's
# result retention clears.
reclaim_token = (
int(delivery.scheduled_for.timestamp()) if delivery.scheduled_for else 0
)
await _enqueue_delivery(
delivery.id,
attempt_count=delivery.attempt_count,
reclaim_token=reclaim_token,
)
total += len(due)
after_id = due[-1].id
if len(due) < page_size:
break
if total:
logger.info(f"Webhook delivery sweep: re-enqueued {total} due deliveries")

View file

@ -1,32 +1,29 @@
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
from uuid import uuid4
import httpx
import pytest
from api.db.models import (
OrganizationModel,
UserModel,
WorkflowModel,
WorkflowRunModel,
)
from api.services.workflow.dto import WebhookNodeData
from api.tasks.run_integrations import _execute_webhook_node
from api.tasks.run_integrations import (
_build_webhook_payload,
_enqueue_webhook_delivery,
)
from api.tasks.webhook_delivery import deliver_webhook
# ---------------------------------------------------------------------------
# Payload rendering (call_disposition injection)
# ---------------------------------------------------------------------------
def _mock_httpx_client(captured: dict):
"""Build a patch target for httpx.AsyncClient that records the request kwargs."""
response = MagicMock()
response.status_code = 200
response.raise_for_status = MagicMock()
async def _request(**kwargs):
captured.update(kwargs)
return response
client = MagicMock()
client.request = AsyncMock(side_effect=_request)
ctx = MagicMock()
ctx.__aenter__ = AsyncMock(return_value=client)
ctx.__aexit__ = AsyncMock(return_value=False)
return MagicMock(return_value=ctx)
@pytest.mark.asyncio
async def test_webhook_injects_disposition_when_absent():
def test_build_webhook_payload_injects_disposition_when_absent():
"""call_disposition is added to the payload when the template omits it."""
webhook = WebhookNodeData(
name="Test Webhook",
@ -36,21 +33,12 @@ async def test_webhook_injects_disposition_when_absent():
)
render_context = {"gathered_context": {"call_disposition": "no-answer"}}
captured: dict = {}
with patch(
"api.tasks.run_integrations.httpx.AsyncClient", _mock_httpx_client(captured)
):
ok = await _execute_webhook_node(webhook, render_context, organization_id=1)
payload = _build_webhook_payload(webhook, render_context)
assert ok is True
assert captured["json"] == {
"event": "call_done",
"call_disposition": "no-answer",
}
assert payload == {"event": "call_done", "call_disposition": "no-answer"}
@pytest.mark.asyncio
async def test_webhook_preserves_template_disposition():
def test_build_webhook_payload_preserves_template_disposition():
"""A disposition key set explicitly in the template is not overwritten."""
webhook = WebhookNodeData(
name="Test Webhook",
@ -60,17 +48,12 @@ async def test_webhook_preserves_template_disposition():
)
render_context = {"gathered_context": {"call_disposition": "no-answer"}}
captured: dict = {}
with patch(
"api.tasks.run_integrations.httpx.AsyncClient", _mock_httpx_client(captured)
):
await _execute_webhook_node(webhook, render_context, organization_id=1)
payload = _build_webhook_payload(webhook, render_context)
assert captured["json"]["call_disposition"] == "custom-from-template"
assert payload["call_disposition"] == "custom-from-template"
@pytest.mark.asyncio
async def test_webhook_injects_empty_disposition_when_context_missing():
def test_build_webhook_payload_empty_disposition_when_context_missing():
"""Missing gathered_context values fall back to an empty string, not omission."""
webhook = WebhookNodeData(
name="Test Webhook",
@ -79,10 +62,446 @@ async def test_webhook_injects_empty_disposition_when_context_missing():
payload_template={},
)
captured: dict = {}
with patch(
"api.tasks.run_integrations.httpx.AsyncClient", _mock_httpx_client(captured)
):
await _execute_webhook_node(webhook, {}, organization_id=1)
payload = _build_webhook_payload(webhook, {})
assert captured["json"] == {"call_disposition": ""}
assert payload == {"call_disposition": ""}
# ---------------------------------------------------------------------------
# Enqueue: persist a delivery row and schedule the first send
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_enqueue_webhook_delivery_persists_and_enqueues():
created = SimpleNamespace(id=42, delivery_uuid="uuid-42")
db = MagicMock()
db.create_webhook_delivery = AsyncMock(return_value=(created, True))
enqueue = AsyncMock()
webhook = WebhookNodeData(
name="Final Webhook",
enabled=True,
endpoint_url="https://example.com/hook",
http_method="POST",
payload_template={"event": "call_done"},
)
with (
patch("api.tasks.run_integrations.db_client", db),
patch("api.tasks.arq.enqueue_job", enqueue),
):
await _enqueue_webhook_delivery(
webhook_data=webhook,
render_context={"gathered_context": {"call_disposition": "user_hangup"}},
organization_id=7,
workflow_run_id=9,
webhook_node_id="node-1",
)
db.create_webhook_delivery.assert_awaited_once()
kwargs = db.create_webhook_delivery.call_args.kwargs
assert kwargs["workflow_run_id"] == 9
assert kwargs["organization_id"] == 7
assert kwargs["endpoint_url"] == "https://example.com/hook"
assert kwargs["payload"]["call_disposition"] == "user_hangup"
assert kwargs["webhook_node_id"] == "node-1"
enqueue.assert_awaited_once()
# Deterministic job id for the first attempt (dedup-safe).
assert enqueue.call_args.kwargs["_job_id"] == "webhook-delivery-42-0"
@pytest.mark.asyncio
async def test_enqueue_webhook_delivery_idempotent_does_not_reenqueue():
# A retried run gets the existing row back (created=False) -> no second send.
existing = SimpleNamespace(id=42, delivery_uuid="uuid-42")
db = MagicMock()
db.create_webhook_delivery = AsyncMock(return_value=(existing, False))
enqueue = AsyncMock()
webhook = WebhookNodeData(
name="Final Webhook",
enabled=True,
endpoint_url="https://example.com/hook",
payload_template={"event": "call_done"},
)
with (
patch("api.tasks.run_integrations.db_client", db),
patch("api.tasks.arq.enqueue_job", enqueue),
):
await _enqueue_webhook_delivery(
webhook_data=webhook,
render_context={},
organization_id=7,
workflow_run_id=9,
webhook_node_id="node-1",
)
db.create_webhook_delivery.assert_awaited_once()
enqueue.assert_not_called()
@pytest.mark.asyncio
async def test_enqueue_webhook_delivery_drops_secret_custom_headers():
created = SimpleNamespace(id=1, delivery_uuid="u")
db = MagicMock()
db.create_webhook_delivery = AsyncMock(return_value=(created, True))
webhook = WebhookNodeData(
name="Final Webhook",
enabled=True,
endpoint_url="https://example.com/hook",
payload_template={},
custom_headers=[
{"key": "Authorization", "value": "Bearer secret-token"},
{"key": "X-Custom-Auth-Token", "value": "abc"}, # variant -> dropped
{"key": "X-Idempotency-Key", "value": "idem-1"}, # benign -> kept
{"key": "X-Source", "value": "dograh"},
],
)
with (
patch("api.tasks.run_integrations.db_client", db),
patch("api.tasks.arq.enqueue_job", AsyncMock()),
):
await _enqueue_webhook_delivery(
webhook_data=webhook,
render_context={},
organization_id=1,
workflow_run_id=1,
webhook_node_id="n",
)
persisted = db.create_webhook_delivery.call_args.kwargs["custom_headers"]
keys = {h["key"] for h in persisted}
assert "Authorization" not in keys # secret dropped, not stored in plaintext
assert "X-Custom-Auth-Token" not in keys # variant secret also dropped
assert "X-Idempotency-Key" in keys # benign 'key' header NOT a false positive
assert "X-Source" in keys # non-secret header kept
@pytest.mark.asyncio
async def test_enqueue_webhook_delivery_skips_disabled():
db = MagicMock()
db.create_webhook_delivery = AsyncMock()
webhook = WebhookNodeData(
name="Disabled",
enabled=False,
endpoint_url="https://example.com/hook",
payload_template={},
)
with patch("api.tasks.run_integrations.db_client", db):
await _enqueue_webhook_delivery(
webhook_data=webhook,
render_context={},
organization_id=1,
workflow_run_id=1,
webhook_node_id="n",
)
db.create_webhook_delivery.assert_not_called()
@pytest.mark.asyncio
async def test_get_workflow_run_with_context_uses_workflow_org(
async_session, db_session
):
run_org = OrganizationModel(provider_id=f"run-org-{uuid4()}")
selected_org = OrganizationModel(provider_id=f"selected-org-{uuid4()}")
async_session.add_all([run_org, selected_org])
await async_session.flush()
user = UserModel(
provider_id=f"user-{uuid4()}",
selected_organization_id=selected_org.id,
)
async_session.add(user)
await async_session.flush()
workflow = WorkflowModel(
name="Webhook Workflow",
user_id=user.id,
organization_id=run_org.id,
workflow_definition={"nodes": [], "edges": []},
template_context_variables={},
)
async_session.add(workflow)
await async_session.flush()
workflow_run = WorkflowRunModel(
name="Webhook Run",
workflow_id=workflow.id,
mode="test",
)
async_session.add(workflow_run)
await async_session.flush()
_, organization_id = await db_session.get_workflow_run_with_context(workflow_run.id)
assert organization_id == run_org.id
assert organization_id != selected_org.id
@pytest.mark.asyncio
async def test_create_webhook_delivery_rejects_org_mismatch(async_session, db_session):
run_org = OrganizationModel(provider_id=f"run-org-{uuid4()}")
wrong_org = OrganizationModel(provider_id=f"wrong-org-{uuid4()}")
async_session.add_all([run_org, wrong_org])
await async_session.flush()
user = UserModel(
provider_id=f"user-{uuid4()}",
selected_organization_id=wrong_org.id,
)
async_session.add(user)
await async_session.flush()
workflow = WorkflowModel(
name="Webhook Workflow",
user_id=user.id,
organization_id=run_org.id,
workflow_definition={"nodes": [], "edges": []},
template_context_variables={},
)
async_session.add(workflow)
await async_session.flush()
workflow_run = WorkflowRunModel(
name="Webhook Run",
workflow_id=workflow.id,
mode="test",
)
async_session.add(workflow_run)
await async_session.flush()
with pytest.raises(ValueError, match="belongs to organization"):
await db_session.create_webhook_delivery(
workflow_run_id=workflow_run.id,
organization_id=wrong_org.id,
endpoint_url="https://example.com/hook",
payload={"event": "call_done"},
max_attempts=5,
webhook_node_id="node-1",
)
delivery, created = await db_session.create_webhook_delivery(
workflow_run_id=workflow_run.id,
organization_id=run_org.id,
endpoint_url="https://example.com/hook",
payload={"event": "call_done"},
max_attempts=5,
webhook_node_id="node-1",
)
assert created is True
assert delivery.organization_id == run_org.id
# ---------------------------------------------------------------------------
# Delivery task: send, retry, dead-letter
# ---------------------------------------------------------------------------
def _fake_delivery(**overrides):
base = dict(
id=1,
delivery_uuid="uuid-1",
workflow_run_id=9,
organization_id=7,
webhook_name="Final Webhook",
endpoint_url="https://example.com/hook",
http_method="POST",
payload={"event": "call_done"},
custom_headers=None,
credential_uuid=None,
status="pending",
attempt_count=0,
max_attempts=5,
)
base.update(overrides)
return SimpleNamespace(**base)
def _mock_httpx(*, raise_request_error=None, status_error=None, status_code=200):
"""Patch target for httpx.AsyncClient used by the delivery task."""
response = MagicMock()
response.status_code = status_code
response.text = "body"
if status_error is not None:
response.raise_for_status = MagicMock(side_effect=status_error)
else:
response.raise_for_status = MagicMock()
async def _request(**kwargs):
if raise_request_error is not None:
raise raise_request_error
return response
client = MagicMock()
client.request = AsyncMock(side_effect=_request)
ctx = MagicMock()
ctx.__aenter__ = AsyncMock(return_value=client)
ctx.__aexit__ = AsyncMock(return_value=False)
return MagicMock(return_value=ctx)
def _delivery_db(delivery):
db = MagicMock()
# The task claims the delivery atomically before sending; a successful claim
# returns the row.
db.claim_webhook_delivery = AsyncMock(return_value=delivery)
db.get_webhook_delivery = AsyncMock(return_value=delivery)
db.get_credential_by_uuid = AsyncMock(return_value=None)
db.mark_webhook_delivery_succeeded = AsyncMock()
db.schedule_webhook_delivery_retry = AsyncMock()
db.mark_webhook_delivery_dead_letter = AsyncMock()
return db
@pytest.mark.asyncio
async def test_deliver_webhook_success():
delivery = _fake_delivery()
db = _delivery_db(delivery)
with (
patch("api.tasks.webhook_delivery.db_client", db),
patch("api.tasks.webhook_delivery.httpx.AsyncClient", _mock_httpx()),
):
await deliver_webhook(None, delivery.id)
db.mark_webhook_delivery_succeeded.assert_awaited_once_with(1, 1, 200)
db.schedule_webhook_delivery_retry.assert_not_called()
db.mark_webhook_delivery_dead_letter.assert_not_called()
@pytest.mark.asyncio
async def test_deliver_webhook_transient_error_schedules_retry():
delivery = _fake_delivery(attempt_count=0)
db = _delivery_db(delivery)
enqueue = AsyncMock()
with (
patch("api.tasks.webhook_delivery.db_client", db),
patch(
"api.tasks.webhook_delivery.httpx.AsyncClient",
_mock_httpx(raise_request_error=httpx.ConnectTimeout("timed out")),
),
patch("api.tasks.arq.enqueue_job", enqueue),
):
await deliver_webhook(None, delivery.id)
db.schedule_webhook_delivery_retry.assert_awaited_once()
assert db.schedule_webhook_delivery_retry.call_args.kwargs["attempt_count"] == 1
db.mark_webhook_delivery_dead_letter.assert_not_called()
# Re-enqueued with a deferral and the next attempt's job id.
enqueue.assert_awaited_once()
assert enqueue.call_args.kwargs["_job_id"] == "webhook-delivery-1-1"
assert enqueue.call_args.kwargs["_defer_by"] > 0
@pytest.mark.asyncio
async def test_deliver_webhook_permanent_4xx_dead_letters():
delivery = _fake_delivery()
db = _delivery_db(delivery)
resp = MagicMock(status_code=401, text="Unauthorized")
status_error = httpx.HTTPStatusError("401", request=MagicMock(), response=resp)
with (
patch("api.tasks.webhook_delivery.db_client", db),
patch(
"api.tasks.webhook_delivery.httpx.AsyncClient",
_mock_httpx(status_error=status_error, status_code=401),
),
):
await deliver_webhook(None, delivery.id)
db.mark_webhook_delivery_dead_letter.assert_awaited_once()
db.schedule_webhook_delivery_retry.assert_not_called()
@pytest.mark.asyncio
async def test_deliver_webhook_retryable_5xx_schedules_retry():
delivery = _fake_delivery()
db = _delivery_db(delivery)
enqueue = AsyncMock()
resp = MagicMock(status_code=503, text="unavailable")
status_error = httpx.HTTPStatusError("503", request=MagicMock(), response=resp)
with (
patch("api.tasks.webhook_delivery.db_client", db),
patch(
"api.tasks.webhook_delivery.httpx.AsyncClient",
_mock_httpx(status_error=status_error, status_code=503),
),
patch("api.tasks.arq.enqueue_job", enqueue),
):
await deliver_webhook(None, delivery.id)
db.schedule_webhook_delivery_retry.assert_awaited_once()
db.mark_webhook_delivery_dead_letter.assert_not_called()
@pytest.mark.asyncio
async def test_deliver_webhook_exhausted_attempts_dead_letters():
# attempt_count=4 -> this is attempt 5 == max_attempts, so no further retry.
delivery = _fake_delivery(attempt_count=4, max_attempts=5)
db = _delivery_db(delivery)
with (
patch("api.tasks.webhook_delivery.db_client", db),
patch(
"api.tasks.webhook_delivery.httpx.AsyncClient",
_mock_httpx(raise_request_error=httpx.ConnectError("boom")),
),
):
await deliver_webhook(None, delivery.id)
db.mark_webhook_delivery_dead_letter.assert_awaited_once()
assert db.mark_webhook_delivery_dead_letter.call_args.args[1] == 5
db.schedule_webhook_delivery_retry.assert_not_called()
@pytest.mark.asyncio
async def test_deliver_webhook_no_op_when_claim_fails():
# The atomic claim returns None when the delivery is not pending/due or was
# already claimed by a concurrent worker -> no send, no double-fire.
delivery = _fake_delivery(status="succeeded")
db = _delivery_db(delivery)
db.claim_webhook_delivery = AsyncMock(return_value=None)
httpx_mock = _mock_httpx()
with (
patch("api.tasks.webhook_delivery.db_client", db),
patch("api.tasks.webhook_delivery.httpx.AsyncClient", httpx_mock),
):
await deliver_webhook(None, delivery.id)
httpx_mock.assert_not_called()
db.mark_webhook_delivery_succeeded.assert_not_called()
db.mark_webhook_delivery_dead_letter.assert_not_called()
@pytest.mark.asyncio
async def test_deliver_webhook_delivered_but_record_failure_does_not_dead_letter():
# If the HTTP POST is accepted (2xx) but recording success fails (DB blip),
# the row must NOT be dead-lettered -- it stays pending for the sweeper to
# reconcile (the receiver dedups the re-send via X-Dograh-Delivery-Id).
delivery = _fake_delivery()
db = _delivery_db(delivery)
db.mark_webhook_delivery_succeeded = AsyncMock(
side_effect=RuntimeError("db connection blip")
)
with (
patch("api.tasks.webhook_delivery.db_client", db),
patch("api.tasks.webhook_delivery.httpx.AsyncClient", _mock_httpx()),
):
await deliver_webhook(None, delivery.id)
db.mark_webhook_delivery_succeeded.assert_awaited_once()
db.mark_webhook_delivery_dead_letter.assert_not_called()
db.schedule_webhook_delivery_retry.assert_not_called()