From fd0d144b0831c120b903baebb5d01c2870f7ab5f Mon Sep 17 00:00:00 2001 From: Tararais <90864951+xTararAisx@users.noreply.github.com> Date: Thu, 2 Jul 2026 17:14:14 +0100 Subject: [PATCH] feat(webhooks): durable retrying delivery for final webhooks (#478) * feat(webhooks): durable retrying delivery for final webhooks Final webhook nodes were fired inline with a single best-effort httpx POST (run_integrations._execute_webhook_node). On a transient error the failure was swallowed at three levels, so ARQ never retried and the final call report was permanently lost -- leaving downstream receivers stuck (e.g. a CRM showing a call as still "in conversation"). Replace the one-shot POST with a durable, idempotent delivery pipeline modelled on the campaign retry pattern (persisted row + scheduled_for + bounded attempts): - New webhook_deliveries table (WebhookDeliveryModel) is the source of truth. Payload is rendered once and frozen so retries are deterministic; secrets are not stored -- the credential is referenced by uuid and re-resolved at send time. - run_integrations now persists a delivery row and enqueues deliver_webhook with a deterministic ARQ job id instead of sending inline. - deliver_webhook (new ARQ task) sends the request and: * 2xx -> succeeded * transient -> retry with capped exponential backoff (RequestError / 5xx / 408 / 425 / 429), up to max_attempts then dead_letter * permanent 4xx -> dead_letter immediately (no pointless looping) It is idempotent: a non-pending delivery is a no-op, so a duplicate enqueue or sweeper re-injection can't double-send. - sweep_webhook_deliveries cron (every 5 min) re-enqueues overdue pending deliveries so nothing is lost to a worker restart / Redis flush. - Stable X-Dograh-Delivery-Id / Workflow-Run-Id / Attempt headers let receivers dedupe retried deliveries. - enqueue_job now forwards ARQ job options (_job_id, _defer_by); failures log repr(e) so empty-message errors like ConnectTimeout are diagnosable. Config via DEFAULT_WEBHOOK_DELIVERY_CONFIG (env-overridable): max_attempts=5, base_delay=30s, max_delay=600s, timeout=30s. Tests cover payload rendering, persist+enqueue, success, transient retry, retryable 5xx, permanent 4xx dead-letter, attempt exhaustion, and idempotency. Migration verified to apply/rollback against Postgres; table/enum/indexes confirmed. * fix(webhooks): atomic claim, safe success-recording, sweep paging, migration cleanup Address review feedback on the webhook delivery pipeline: - deliver_webhook now atomically claims a delivery (conditional UPDATE that leases scheduled_for) before sending, so concurrent ARQ executions can't double-send (the prior status=='pending' read was non-atomic). - Recording success is moved out of the dead-letter try-block: if the receiver accepted the payload (2xx) but the success DB-write fails, the row is left pending for the sweeper to reconcile instead of being dead-lettered. - The sweep keyset-paginates by id so a backlog over the page size is fully drained, and logs the true re-enqueued total. - Migration downgrade drops the enum via op.execute(DROP TYPE IF EXISTS ...) instead of the deprecated op.get_bind(). * fix(webhooks): idempotent delivery creation and drop secret custom headers Address the remaining review feedback: - Add a (workflow_run_id, webhook_node_id) unique constraint and make create_webhook_delivery a get-or-create returning (delivery, created). A retried run_integrations now reuses the existing row instead of creating and sending a duplicate final webhook; only a freshly-created row is enqueued. - Stop persisting secret-looking custom headers (Authorization, X-API-Key, Cookie, ...) in plaintext on the delivery row: they are dropped with a warning pointing at the credential store (which is re-resolved securely at send time). Non-secret custom headers are unaffected. * fix(webhooks): harden idempotency key, secret-header match, sweep reclaim id Address follow-up review feedback: - webhook_node_id is now NOT NULL so a NULL can't slip past the (workflow_run_id, webhook_node_id) unique constraint and create duplicates. - Secret-header filtering matches normalized markers (auth/token/secret/cookie/ api-key/...) instead of an exact name list, catching variants like X-Custom-Auth-Token while leaving benign headers (e.g. X-Idempotency-Key). - The sweeper re-enqueues with a reclaim-specific job id (the lease timestamp) so reconciling a delivered-but-unrecorded row isn't deduped against the original attempt's already-completed ARQ job. The atomic claim still ensures at most one send. * fix(webhooks): scope delivery rows to workflow org --------- Co-authored-by: Abhishek Kumar --- .../b7e3c9a1d2f4_add_webhook_deliveries.py | 119 ++++ api/constants.py | 12 + api/db/db_client.py | 3 + api/db/models.py | 97 ++++ api/db/webhook_delivery_client.py | 244 +++++++++ api/db/workflow_run_client.py | 4 +- api/schemas/tool.py | 1 + api/tasks/arq.py | 21 +- api/tasks/function_names.py | 1 + api/tasks/run_integrations.py | 218 +++++--- api/tasks/webhook_delivery.py | 266 +++++++++ api/tests/test_run_integrations_webhook.py | 515 ++++++++++++++++-- 12 files changed, 1360 insertions(+), 141 deletions(-) create mode 100644 api/alembic/versions/b7e3c9a1d2f4_add_webhook_deliveries.py create mode 100644 api/db/webhook_delivery_client.py create mode 100644 api/tasks/webhook_delivery.py diff --git a/api/alembic/versions/b7e3c9a1d2f4_add_webhook_deliveries.py b/api/alembic/versions/b7e3c9a1d2f4_add_webhook_deliveries.py new file mode 100644 index 00000000..599b6287 --- /dev/null +++ b/api/alembic/versions/b7e3c9a1d2f4_add_webhook_deliveries.py @@ -0,0 +1,119 @@ +"""add webhook_deliveries + +Durable, retrying outbound webhook delivery so a transient network error can't +permanently drop a workflow's final webhook. + +Revision ID: b7e3c9a1d2f4 +Revises: 91cc6ba3e1c7 +Create Date: 2026-06-28 19:40:00.000000 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "b7e3c9a1d2f4" +down_revision: Union[str, None] = "91cc6ba3e1c7" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "webhook_deliveries", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("delivery_uuid", sa.String(length=36), nullable=False), + sa.Column("workflow_run_id", sa.Integer(), nullable=False), + sa.Column("organization_id", sa.Integer(), nullable=False), + sa.Column("webhook_name", sa.String(), nullable=True), + sa.Column("endpoint_url", sa.String(), nullable=False), + sa.Column( + "http_method", + sa.String(), + nullable=False, + ), + sa.Column( + "payload", + sa.JSON(), + nullable=False, + ), + sa.Column("custom_headers", sa.JSON(), nullable=True), + sa.Column("credential_uuid", sa.String(length=36), nullable=True), + sa.Column("webhook_node_id", sa.String(), nullable=False), + sa.Column( + "status", + sa.Enum( + "pending", + "succeeded", + "dead_letter", + name="webhook_delivery_status", + ), + server_default=sa.text("'pending'"), + nullable=False, + ), + sa.Column( + "attempt_count", + sa.Integer(), + server_default=sa.text("0"), + nullable=False, + ), + sa.Column( + "max_attempts", + sa.Integer(), + server_default=sa.text("5"), + nullable=False, + ), + sa.Column("scheduled_for", sa.DateTime(timezone=True), nullable=True), + sa.Column("last_status_code", sa.Integer(), nullable=True), + sa.Column("last_error", sa.Text(), nullable=True), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=True), + sa.Column("updated_at", sa.DateTime(timezone=True), nullable=True), + sa.ForeignKeyConstraint( + ["workflow_run_id"], ["workflow_runs.id"], ondelete="CASCADE" + ), + sa.ForeignKeyConstraint( + ["organization_id"], ["organizations.id"], ondelete="CASCADE" + ), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint( + "workflow_run_id", + "webhook_node_id", + name="uq_webhook_deliveries_run_node", + ), + ) + op.create_index( + "ix_webhook_deliveries_delivery_uuid", + "webhook_deliveries", + ["delivery_uuid"], + unique=True, + ) + op.create_index( + "idx_webhook_deliveries_run", + "webhook_deliveries", + ["workflow_run_id"], + unique=False, + ) + # Partial index for the sweeper's hot path: due pending deliveries. + op.create_index( + "idx_webhook_deliveries_pending_scheduled", + "webhook_deliveries", + ["scheduled_for"], + unique=False, + postgresql_where=sa.text("status = 'pending'"), + ) + + +def downgrade() -> None: + op.drop_index( + "idx_webhook_deliveries_pending_scheduled", + table_name="webhook_deliveries", + ) + op.drop_index("idx_webhook_deliveries_run", table_name="webhook_deliveries") + op.drop_index( + "ix_webhook_deliveries_delivery_uuid", table_name="webhook_deliveries" + ) + op.drop_table("webhook_deliveries") + op.execute("DROP TYPE IF EXISTS webhook_delivery_status") diff --git a/api/constants.py b/api/constants.py index b38a6385..b2b5d24e 100644 --- a/api/constants.py +++ b/api/constants.py @@ -161,6 +161,18 @@ DEFAULT_CAMPAIGN_RETRY_CONFIG = { } +# Outbound webhook delivery: bounded retry with exponential backoff. +# Delivery is persisted (see WebhookDeliveryModel) and retried by an ARQ task so a +# transient network error can't permanently drop a final webhook. After +# ``max_attempts`` transient failures the delivery is parked as ``dead_letter``. +DEFAULT_WEBHOOK_DELIVERY_CONFIG = { + "max_attempts": int(os.getenv("WEBHOOK_DELIVERY_MAX_ATTEMPTS", 5)), + "base_delay_seconds": int(os.getenv("WEBHOOK_DELIVERY_BASE_DELAY_SECONDS", 30)), + "max_delay_seconds": int(os.getenv("WEBHOOK_DELIVERY_MAX_DELAY_SECONDS", 600)), + "timeout_seconds": int(os.getenv("WEBHOOK_DELIVERY_TIMEOUT_SECONDS", 30)), +} + + # Circuit breaker defaults for campaign call failure detection DEFAULT_CIRCUIT_BREAKER_CONFIG = { "enabled": True, diff --git a/api/db/db_client.py b/api/db/db_client.py index 15d1c108..d7e39afa 100644 --- a/api/db/db_client.py +++ b/api/db/db_client.py @@ -14,6 +14,7 @@ from api.db.telephony_phone_number_client import TelephonyPhoneNumberClient from api.db.tool_client import ToolClient from api.db.user_client import UserClient from api.db.webhook_credential_client import WebhookCredentialClient +from api.db.webhook_delivery_client import WebhookDeliveryClient from api.db.workflow_client import WorkflowClient from api.db.workflow_recording_client import WorkflowRecordingClient from api.db.workflow_run_client import WorkflowRunClient @@ -37,6 +38,7 @@ class DBClient( EmbedTokenClient, AgentTriggerClient, WebhookCredentialClient, + WebhookDeliveryClient, ToolClient, KnowledgeBaseClient, WorkflowRecordingClient, @@ -62,6 +64,7 @@ class DBClient( - EmbedTokenClient: handles embed token and session operations - AgentTriggerClient: handles agent trigger operations for API-based call triggering - WebhookCredentialClient: handles webhook credential operations + - WebhookDeliveryClient: handles durable outbound webhook delivery records - ToolClient: handles tool operations for reusable HTTP API tools - KnowledgeBaseClient: handles knowledge base document and vector search operations - FolderClient: handles folder operations for grouping workflows (agents) diff --git a/api/db/models.py b/api/db/models.py index d2cfc42f..66837724 100644 --- a/api/db/models.py +++ b/api/db/models.py @@ -1022,6 +1022,103 @@ class ExternalCredentialModel(Base): ) +class WebhookDeliveryModel(Base): + """Durable record of an outbound webhook delivery attempt. + + Final webhooks (e.g. a workflow's "Final Webhook" node) must not be lost to a + single transient network error. Instead of firing the HTTP request inline and + forgetting it, we persist one row per webhook node per workflow run and let an + ARQ task drive delivery with bounded, backed-off retries. The row is the source + of truth: it survives worker restarts and a periodic sweeper re-enqueues any + ``pending`` delivery whose ``scheduled_for`` is overdue. After ``max_attempts`` + transient failures (or on a permanent 4xx) the row is parked as ``dead_letter`` + for inspection rather than retried forever. + + Mirrors the campaign retry pattern (``QueuedRunModel``): persisted state, + ``scheduled_for`` gating, a hard attempt ceiling, and a terminal failure state. + """ + + __tablename__ = "webhook_deliveries" + + id = Column(Integer, primary_key=True, index=True) + + # Stable idempotency key sent to the receiver so it can dedupe retries. + delivery_uuid = Column( + String(36), + unique=True, + nullable=False, + index=True, + default=lambda: str(uuid.uuid4()), + ) + + workflow_run_id = Column( + Integer, + ForeignKey("workflow_runs.id", ondelete="CASCADE"), + nullable=False, + ) + organization_id = Column( + Integer, ForeignKey("organizations.id", ondelete="CASCADE"), nullable=False + ) + + # Frozen request definition. The payload is rendered once at enqueue time so + # retries are deterministic. Secrets are NOT stored here: the auth header is + # re-resolved from ``credential_uuid`` at send time (honours rotation/revocation). + webhook_name = Column(String, nullable=True) + endpoint_url = Column(String, nullable=False) + http_method = Column(String, nullable=False, default="POST") + payload = Column(JSON, nullable=False, default=dict) + custom_headers = Column(JSON, nullable=True) + credential_uuid = Column(String(36), nullable=True) + + # Workflow node that produced this delivery. Combined with workflow_run_id it + # is the per-run/per-node idempotency key, so a retried run_integrations does + # not create (and send) a duplicate delivery for the same node. Non-nullable: + # a NULL would be distinct under the unique constraint and defeat the dedupe. + webhook_node_id = Column(String, nullable=False) + + status = Column( + Enum( + "pending", + "succeeded", + "dead_letter", + name="webhook_delivery_status", + ), + nullable=False, + default="pending", + server_default="pending", + ) + attempt_count = Column(Integer, nullable=False, default=0, server_default=text("0")) + max_attempts = Column(Integer, nullable=False, default=5, server_default=text("5")) + # When the next attempt becomes due. NULL once terminal (succeeded/dead_letter). + scheduled_for = Column(DateTime(timezone=True), nullable=True) + + last_status_code = Column(Integer, nullable=True) + last_error = Column(Text, nullable=True) + + created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(UTC)) + updated_at = Column( + DateTime(timezone=True), + default=lambda: datetime.now(UTC), + onupdate=lambda: datetime.now(UTC), + ) + + __table_args__ = ( + # Sweeper lookup: due pending deliveries. + Index( + "idx_webhook_deliveries_pending_scheduled", + "scheduled_for", + postgresql_where=text("status = 'pending'"), + ), + Index("idx_webhook_deliveries_run", "workflow_run_id"), + # Per-run/per-node idempotency: one delivery per webhook node per run. + UniqueConstraint( + "workflow_run_id", + "webhook_node_id", + name="uq_webhook_deliveries_run_node", + ), + ) + + class ToolModel(Base): """Model for storing reusable tools that can be invoked during workflows. diff --git a/api/db/webhook_delivery_client.py b/api/db/webhook_delivery_client.py new file mode 100644 index 00000000..a06a5be9 --- /dev/null +++ b/api/db/webhook_delivery_client.py @@ -0,0 +1,244 @@ +"""Database client for durable outbound webhook deliveries. + +Persists one row per webhook node per workflow run and exposes the state +transitions the delivery task and sweeper need: create (pending), succeed, +schedule the next retry, and park as dead-letter. Mirrors the campaign retry +pattern -- the row is the source of truth, ``scheduled_for`` gates due work. +""" + +from datetime import UTC, datetime, timedelta +from typing import List, Optional, Tuple + +from loguru import logger +from sqlalchemy import or_, select, update +from sqlalchemy.exc import IntegrityError + +from api.db.base_client import BaseDBClient +from api.db.models import WebhookDeliveryModel, WorkflowModel, WorkflowRunModel + + +class WebhookDeliveryClient(BaseDBClient): + """Client for managing persisted webhook delivery records.""" + + async def create_webhook_delivery( + self, + workflow_run_id: int, + organization_id: int, + endpoint_url: str, + payload: dict, + max_attempts: int, + http_method: str = "POST", + webhook_name: Optional[str] = None, + custom_headers: Optional[list] = None, + credential_uuid: Optional[str] = None, + webhook_node_id: Optional[str] = None, + scheduled_for: Optional[datetime] = None, + ) -> Tuple[WebhookDeliveryModel, bool]: + """Get-or-create the ``pending`` delivery for this run + webhook node. + + Idempotent on ``(workflow_run_id, webhook_node_id)``: a retried + ``run_integrations`` returns the existing row instead of creating (and + sending) a duplicate. Returns ``(delivery, created)`` so the caller only + enqueues a send for a freshly-created row. + """ + async with self.async_session() as session: + run_scope_result = await session.execute( + select(WorkflowRunModel.id, WorkflowModel.organization_id) + .join(WorkflowModel, WorkflowRunModel.workflow_id == WorkflowModel.id) + .where(WorkflowRunModel.id == workflow_run_id) + ) + run_scope = run_scope_result.one_or_none() + if run_scope is None: + raise ValueError(f"Workflow run {workflow_run_id} not found") + + _, run_organization_id = run_scope + if run_organization_id is None: + raise ValueError( + f"Workflow run {workflow_run_id} is not associated with an organization" + ) + if run_organization_id != organization_id: + raise ValueError( + f"Workflow run {workflow_run_id} belongs to organization " + f"{run_organization_id}, not {organization_id}" + ) + + delivery = WebhookDeliveryModel( + workflow_run_id=workflow_run_id, + organization_id=organization_id, + webhook_name=webhook_name, + webhook_node_id=webhook_node_id, + endpoint_url=endpoint_url, + http_method=http_method, + payload=payload, + custom_headers=custom_headers, + credential_uuid=credential_uuid, + max_attempts=max_attempts, + status="pending", + attempt_count=0, + scheduled_for=scheduled_for or datetime.now(UTC), + ) + session.add(delivery) + try: + await session.commit() + except IntegrityError: + await session.rollback() + existing = await session.execute( + select(WebhookDeliveryModel).where( + WebhookDeliveryModel.workflow_run_id == workflow_run_id, + WebhookDeliveryModel.webhook_node_id == webhook_node_id, + ) + ) + row = existing.scalar_one_or_none() + if row is not None: + return row, False + # The violation was not the run+node uniqueness -- re-raise. + raise + await session.refresh(delivery) + return delivery, True + + async def get_webhook_delivery( + self, delivery_id: int + ) -> Optional[WebhookDeliveryModel]: + async with self.async_session() as session: + result = await session.execute( + select(WebhookDeliveryModel).where( + WebhookDeliveryModel.id == delivery_id + ) + ) + return result.scalar_one_or_none() + + async def claim_webhook_delivery( + self, delivery_id: int, lease_seconds: int + ) -> Optional[WebhookDeliveryModel]: + """Atomically claim a pending, due delivery for one worker to process. + + A conditional UPDATE pushes ``scheduled_for`` out by a short lease. Only + one concurrent worker can win -- the others re-evaluate the WHERE after + the first commits, see the future ``scheduled_for``, match nothing, and + get ``None``. This prevents the non-atomic ``status == 'pending'`` read + from letting two workers double-send the same delivery. If the winning + worker crashes mid-send, the lease expires and the sweeper re-enqueues it. + + Returns the claimed row, or ``None`` if it was not claimable (already + claimed, not pending, or not yet due). + """ + now = datetime.now(UTC) + lease_until = now + timedelta(seconds=lease_seconds) + async with self.async_session() as session: + result = await session.execute( + update(WebhookDeliveryModel) + .where( + WebhookDeliveryModel.id == delivery_id, + WebhookDeliveryModel.status == "pending", + or_( + WebhookDeliveryModel.scheduled_for.is_(None), + WebhookDeliveryModel.scheduled_for <= now, + ), + ) + .values(scheduled_for=lease_until, updated_at=now) + ) + await session.commit() + if result.rowcount == 0: + return None + fetched = await session.execute( + select(WebhookDeliveryModel).where( + WebhookDeliveryModel.id == delivery_id + ) + ) + return fetched.scalar_one_or_none() + + async def mark_webhook_delivery_succeeded( + self, delivery_id: int, attempt_count: int, status_code: Optional[int] + ) -> None: + async with self.async_session() as session: + await session.execute( + update(WebhookDeliveryModel) + .where(WebhookDeliveryModel.id == delivery_id) + .values( + status="succeeded", + attempt_count=attempt_count, + last_status_code=status_code, + last_error=None, + scheduled_for=None, + updated_at=datetime.now(UTC), + ) + ) + await session.commit() + + async def schedule_webhook_delivery_retry( + self, + delivery_id: int, + attempt_count: int, + scheduled_for: datetime, + last_error: str, + last_status_code: Optional[int], + ) -> None: + """Record a transient failure and set when the next attempt is due.""" + async with self.async_session() as session: + await session.execute( + update(WebhookDeliveryModel) + .where(WebhookDeliveryModel.id == delivery_id) + .values( + status="pending", + attempt_count=attempt_count, + scheduled_for=scheduled_for, + last_error=last_error[:2000] if last_error else last_error, + last_status_code=last_status_code, + updated_at=datetime.now(UTC), + ) + ) + await session.commit() + + async def mark_webhook_delivery_dead_letter( + self, + delivery_id: int, + attempt_count: int, + last_error: str, + last_status_code: Optional[int], + ) -> None: + """Terminal failure: parked for inspection, never retried again.""" + async with self.async_session() as session: + await session.execute( + update(WebhookDeliveryModel) + .where(WebhookDeliveryModel.id == delivery_id) + .values( + status="dead_letter", + attempt_count=attempt_count, + last_error=last_error[:2000] if last_error else last_error, + last_status_code=last_status_code, + scheduled_for=None, + updated_at=datetime.now(UTC), + ) + ) + await session.commit() + logger.warning( + f"Webhook delivery {delivery_id} dead-lettered after " + f"{attempt_count} attempts: {last_error}" + ) + + async def get_due_webhook_deliveries( + self, now: Optional[datetime] = None, limit: int = 100, after_id: int = 0 + ) -> List[WebhookDeliveryModel]: + """One page of pending deliveries whose next attempt is due. + + Used by the periodic sweeper to re-enqueue deliveries whose ARQ job was + lost (worker restart, Redis flush). The delivery task is idempotent, so a + spurious re-enqueue is harmless. Ordered by ``id`` and gated on + ``after_id`` for keyset pagination -- re-enqueuing does not change a row's + due state, so the sweeper pages by id to drain the whole backlog instead + of re-reading the same first page forever. + """ + cutoff = now or datetime.now(UTC) + async with self.async_session() as session: + result = await session.execute( + select(WebhookDeliveryModel) + .where( + WebhookDeliveryModel.status == "pending", + WebhookDeliveryModel.scheduled_for.isnot(None), + WebhookDeliveryModel.scheduled_for <= cutoff, + WebhookDeliveryModel.id > after_id, + ) + .order_by(WebhookDeliveryModel.id) + .limit(limit) + ) + return list(result.scalars().all()) diff --git a/api/db/workflow_run_client.py b/api/db/workflow_run_client.py index 0a12f435..38be831c 100644 --- a/api/db/workflow_run_client.py +++ b/api/db/workflow_run_client.py @@ -438,10 +438,10 @@ class WorkflowRunClient(BaseDBClient): if not workflow_run: return None, None - if not workflow_run.workflow or not workflow_run.workflow.user: + if not workflow_run.workflow: return workflow_run, None - organization_id = workflow_run.workflow.user.selected_organization_id + organization_id = workflow_run.workflow.organization_id return workflow_run, organization_id async def ensure_public_access_token(self, workflow_run_id: int) -> Optional[str]: diff --git a/api/schemas/tool.py b/api/schemas/tool.py index 1fa55c69..82f23f51 100644 --- a/api/schemas/tool.py +++ b/api/schemas/tool.py @@ -205,6 +205,7 @@ class TransferCallConfig(BaseModel): description="Maximum seconds to wait for the destination to answer.", ) + class McpToolConfig(BaseModel): """Configuration for a customer MCP server tool definition.""" diff --git a/api/tasks/arq.py b/api/tasks/arq.py index 442114e6..01815b7b 100644 --- a/api/tasks/arq.py +++ b/api/tasks/arq.py @@ -12,7 +12,7 @@ from api.tasks.function_names import FunctionNames setup_logging() # Now import ARQ and task dependencies -from arq import create_pool +from arq import create_pool, cron from arq.connections import ArqRedis, RedisSettings parsed_url = urlparse(REDIS_URL) @@ -46,6 +46,7 @@ from api.tasks.campaign_tasks import ( from api.tasks.knowledge_base_processing import process_knowledge_base_document from api.tasks.run_integrations import run_integrations_post_workflow_run from api.tasks.s3_upload import upload_voicemail_audio_to_s3 +from api.tasks.webhook_delivery import deliver_webhook, sweep_webhook_deliveries from api.tasks.workflow_completion import process_workflow_completion @@ -57,8 +58,18 @@ class WorkerSettings: sync_campaign_source, process_campaign_batch, process_knowledge_base_document, + deliver_webhook, + ] + cron_jobs = [ + # Safety net for webhook deliveries whose ARQ job was lost (worker + # restart / Redis flush): re-enqueue any pending delivery that is overdue. + cron( + sweep_webhook_deliveries, + minute=set(range(0, 60, 5)), + second=0, + run_at_startup=True, + ), ] - cron_jobs = [] redis_settings = REDIS_SETTINGS max_jobs = 10 @@ -107,6 +118,8 @@ async def get_arq_redis() -> ArqRedis: return _redis_pool -async def enqueue_job(function_name: FunctionNames, *args): +async def enqueue_job(function_name: FunctionNames, *args, **kwargs): redis = await get_arq_redis() - await redis.enqueue_job(function_name, *args) + # kwargs forwards ARQ job options (e.g. _job_id, _defer_by) used for + # deterministic, backed-off webhook delivery retries. + return await redis.enqueue_job(function_name, *args, **kwargs) diff --git a/api/tasks/function_names.py b/api/tasks/function_names.py index 1a6bce95..599555b5 100644 --- a/api/tasks/function_names.py +++ b/api/tasks/function_names.py @@ -5,3 +5,4 @@ class FunctionNames: SYNC_CAMPAIGN_SOURCE = "sync_campaign_source" PROCESS_CAMPAIGN_BATCH = "process_campaign_batch" PROCESS_KNOWLEDGE_BASE_DOCUMENT = "process_knowledge_base_document" + DELIVER_WEBHOOK = "deliver_webhook" diff --git a/api/tasks/run_integrations.py b/api/tasks/run_integrations.py index db23bec2..3b88db5c 100644 --- a/api/tasks/run_integrations.py +++ b/api/tasks/run_integrations.py @@ -1,15 +1,15 @@ """Execute integrations (QA analysis, webhooks) after workflow run completion.""" import random +from datetime import UTC, datetime from typing import Any, Dict, Optional -import httpx from loguru import logger from pipecat.utils.enums import EndTaskReason from pipecat.utils.run_context import set_current_org_id, set_current_run_id from pydantic import ValidationError -from api.constants import BACKEND_API_ENDPOINT +from api.constants import BACKEND_API_ENDPOINT, DEFAULT_WEBHOOK_DELIVERY_CONFIG from api.db import db_client from api.db.models import WorkflowRunModel from api.enums import OrganizationConfigurationKey @@ -26,7 +26,7 @@ from api.services.workflow.dto import ( WebhookRFNode, ) from api.services.workflow.qa import run_per_node_qa_analysis -from api.utils.credential_auth import build_auth_header +from api.tasks.function_names import FunctionNames from api.utils.recording_artifacts import get_recording_storage_key from api.utils.template_renderer import render_template @@ -315,13 +315,15 @@ async def run_integrations_post_workflow_run(_ctx, workflow_run_id: int): webhook_data = webhook_node.data try: - await _execute_webhook_node( + await _enqueue_webhook_delivery( webhook_data=webhook_data, render_context=render_context, organization_id=organization_id, + workflow_run_id=workflow_run_id, + webhook_node_id=str(node_id), ) except Exception as e: - logger.warning(f"Failed to execute webhook '{webhook_data.name}': {e}") + logger.warning(f"Failed to enqueue webhook '{webhook_data.name}': {e}") except Exception as e: logger.error(f"Error running integrations: {e}", exc_info=True) @@ -387,98 +389,140 @@ def _build_render_context( return context -async def _execute_webhook_node( - webhook_data: WebhookNodeData, - render_context: Dict[str, Any], - organization_id: int, -) -> bool: +def _build_webhook_payload( + webhook_data: WebhookNodeData, render_context: Dict[str, Any] +) -> Any: + """Render the webhook payload once, so retries are deterministic. + + Always surfaces the call disposition on the outgoing payload, even when the + template author didn't reference it. Fill only if absent so a template that + sets it explicitly keeps its own value. """ - Execute a single webhook node. - - Args: - webhook_data: The validated webhook node data - render_context: Context for template rendering - organization_id: For credential lookup - - Returns: - True if successful, False otherwise - """ - webhook_name = webhook_data.name - - if not webhook_data.enabled: - logger.debug(f"Webhook '{webhook_name}' is disabled, skipping") - return True - - url = webhook_data.endpoint_url - if not url: - logger.warning(f"Webhook '{webhook_name}' has no endpoint URL") - return False - - headers = {"Content-Type": "application/json"} - - credential_uuid = webhook_data.credential_uuid - if credential_uuid: - credential = await db_client.get_credential_by_uuid( - credential_uuid, organization_id - ) - if credential: - auth_header = build_auth_header(credential) - headers.update(auth_header) - logger.debug(f"Applied credential '{credential.name}' to webhook") - else: - logger.warning( - f"Credential {credential_uuid} not found for webhook '{webhook_name}'" - ) - - for h in webhook_data.custom_headers or []: - if h.key and h.value: - headers[h.key] = h.value - payload = render_template(webhook_data.payload_template or {}, render_context) - # Always surface the call disposition on the outgoing payload, even when the - # template author didn't reference it. Fill only if absent so a template that - # sets it explicitly keeps its own value. if isinstance(payload, dict): gathered_context = render_context.get("gathered_context") or {} payload.setdefault( "call_disposition", gathered_context.get("call_disposition", "") ) + return payload + + +# Substrings that mark a header as likely carrying a secret. Matched against the +# normalized key so variants are caught too (e.g. ``X-Custom-Auth-Token``, +# ``My-Api-Key``), not just exact names. Their values are NOT persisted on the +# delivery row (which would store them in plaintext); secrets belong in the +# credential store, re-resolved at send time. Bare "key" is intentionally absent +# to avoid dropping benign headers like ``X-Idempotency-Key``. +_SECRET_HEADER_MARKERS = ( + "authorization", + "auth", + "token", + "secret", + "password", + "passwd", + "cookie", + "credential", + "api-key", + "apikey", + "api_key", + "access-key", +) + + +def _looks_like_secret_header(key: str) -> bool: + normalized = key.strip().lower() + return any(marker in normalized for marker in _SECRET_HEADER_MARKERS) + + +def _safe_custom_headers( + webhook_data: WebhookNodeData, webhook_name: str +) -> list[dict]: + """Custom headers to persist, with secret-looking ones dropped. + + Persisting arbitrary header values would store credentials (Authorization, + X-API-Key, ...) in plaintext on the delivery row. Drop those and tell the + operator to use a credential instead. + """ + safe = [] + for h in webhook_data.custom_headers or []: + if not (h.key and h.value): + continue + if _looks_like_secret_header(h.key): + logger.warning( + f"Webhook '{webhook_name}' custom header '{h.key}' looks like a " + f"secret; it will not be stored or sent. Use a credential instead." + ) + continue + safe.append({"key": h.key, "value": h.value}) + return safe + + +async def _enqueue_webhook_delivery( + webhook_data: WebhookNodeData, + render_context: Dict[str, Any], + organization_id: int, + workflow_run_id: int, + webhook_node_id: str, +) -> None: + """Persist a durable delivery record and enqueue its first send attempt. + + The actual HTTP request is performed by the ``deliver_webhook`` task, which + retries transient failures with backoff and dead-letters exhausted/permanent + ones. This replaces the previous one-shot, best-effort inline POST that lost + the webhook entirely on a single network error. + + Idempotent on ``(workflow_run_id, webhook_node_id)``: a retried run reuses the + existing delivery row and does not enqueue a second send. + """ + webhook_name = webhook_data.name + + if not webhook_data.enabled: + logger.debug(f"Webhook '{webhook_name}' is disabled, skipping") + return + + url = webhook_data.endpoint_url + if not url: + logger.warning(f"Webhook '{webhook_name}' has no endpoint URL") + return + + payload = _build_webhook_payload(webhook_data, render_context) + + # Persist non-secret request definition. The credential is stored by reference + # (uuid) and re-resolved at send time so secrets never land in this row. + custom_headers = _safe_custom_headers(webhook_data, webhook_name) method = (webhook_data.http_method or "POST").upper() - logger.info(f"Executing webhook '{webhook_name}': {method}") + delivery, created = await db_client.create_webhook_delivery( + workflow_run_id=workflow_run_id, + organization_id=organization_id, + endpoint_url=url, + payload=payload, + max_attempts=DEFAULT_WEBHOOK_DELIVERY_CONFIG["max_attempts"], + http_method=method, + webhook_name=webhook_name, + custom_headers=custom_headers or None, + credential_uuid=webhook_data.credential_uuid, + webhook_node_id=webhook_node_id, + ) - try: - async with httpx.AsyncClient() as client: - if method in ("POST", "PUT", "PATCH"): - response = await client.request( - method=method, - url=url, - json=payload, - headers=headers, - timeout=30.0, - ) - else: # GET, DELETE - response = await client.request( - method=method, - url=url, - headers=headers, - timeout=30.0, - ) - - response.raise_for_status() - logger.info(f"Webhook '{webhook_name}' succeeded: {response.status_code}") - return True - - except httpx.HTTPStatusError as e: - logger.warning( - f"Webhook '{webhook_name}' failed: {e.response.status_code} - {e.response.text[:200]}" + if not created: + logger.info( + f"Webhook '{webhook_name}' delivery already exists for run " + f"{workflow_run_id} node {webhook_node_id}; not re-enqueuing" ) - return False - except httpx.RequestError as e: - logger.warning(f"Webhook '{webhook_name}' request error: {e}") - return False - except Exception as e: - logger.error(f"Webhook '{webhook_name}' unexpected error: {e}") - return False + return + + # Lazy import avoids a circular import (arq imports this module at load time). + from api.tasks.arq import enqueue_job + + await enqueue_job( + FunctionNames.DELIVER_WEBHOOK, + delivery.id, + _job_id=f"webhook-delivery-{delivery.id}-0", + ) + logger.info( + f"Enqueued webhook '{webhook_name}' delivery {delivery.delivery_uuid} " + f"for run {workflow_run_id}" + ) diff --git a/api/tasks/webhook_delivery.py b/api/tasks/webhook_delivery.py new file mode 100644 index 00000000..c4692715 --- /dev/null +++ b/api/tasks/webhook_delivery.py @@ -0,0 +1,266 @@ +"""Durable, retrying delivery of outbound webhooks. + +A workflow's final webhook must survive a transient network error. Rather than +firing the HTTP POST inline and forgetting it, ``run_integrations`` persists a +``WebhookDeliveryModel`` row and enqueues :func:`deliver_webhook`. This task sends +the request and, on a *transient* failure, schedules the next attempt with +exponential backoff -- up to ``max_attempts``, after which the delivery is parked +as ``dead_letter`` for inspection. Permanent failures (most 4xx) dead-letter +immediately instead of looping. + +A periodic :func:`sweep_webhook_deliveries` cron re-enqueues any ``pending`` +delivery whose attempt is overdue, so deliveries survive worker restarts / lost +ARQ jobs. The DB row is the source of truth; this task is idempotent and only +acts on a delivery that is still ``pending``. +""" + +from datetime import UTC, datetime, timedelta +from typing import Optional + +import httpx +from loguru import logger +from pipecat.utils.run_context import set_current_run_id + +from api.constants import DEFAULT_WEBHOOK_DELIVERY_CONFIG +from api.db import db_client +from api.db.models import WebhookDeliveryModel +from api.tasks.function_names import FunctionNames +from api.utils.credential_auth import build_auth_header + +# HTTP statuses that are worth retrying even though the server answered. +_RETRYABLE_STATUS_CODES = {408, 425, 429, 500, 502, 503, 504} + + +def _delivery_job_id(delivery_id: int, attempt_count: int) -> str: + """Deterministic ARQ job id so duplicate enqueues (task re-enqueue + sweeper) + collapse to one job instead of double-sending.""" + return f"webhook-delivery-{delivery_id}-{attempt_count}" + + +def _backoff_seconds(attempt: int) -> int: + """Exponential backoff (capped) for the next attempt after `attempt` failures.""" + base = DEFAULT_WEBHOOK_DELIVERY_CONFIG["base_delay_seconds"] + cap = DEFAULT_WEBHOOK_DELIVERY_CONFIG["max_delay_seconds"] + return min(base * (2 ** (attempt - 1)), cap) + + +async def _enqueue_delivery( + delivery_id: int, + attempt_count: int, + defer_by: int = 0, + reclaim_token: Optional[int] = None, +): + """Enqueue a delivery attempt with a dedup-safe job id. + + The normal (task self-retry) path uses a deterministic id so a retry and a + sweeper pass for the *same* attempt collapse to one job. The sweeper passes a + ``reclaim_token`` (the lease timestamp) to get a distinct id, so reconciling a + delivered-but-unrecorded row is not deduped against the original attempt's + already-completed job. The atomic claim still guarantees at most one send. + """ + from api.tasks.arq import enqueue_job # lazy import avoids circular import + + if reclaim_token is not None: + job_id = f"webhook-delivery-reclaim-{delivery_id}-{reclaim_token}" + else: + job_id = _delivery_job_id(delivery_id, attempt_count) + + await enqueue_job( + FunctionNames.DELIVER_WEBHOOK, + delivery_id, + _job_id=job_id, + _defer_by=defer_by, + ) + + +async def _build_headers(delivery: WebhookDeliveryModel, attempt: int) -> dict: + """Assemble request headers, re-resolving credential auth at send time so + secrets are never persisted on the delivery row and rotation is honoured.""" + headers = {"Content-Type": "application/json"} + + if delivery.credential_uuid: + credential = await db_client.get_credential_by_uuid( + delivery.credential_uuid, delivery.organization_id + ) + if credential: + headers.update(build_auth_header(credential)) + else: + logger.warning( + f"Credential {delivery.credential_uuid} not found for webhook " + f"'{delivery.webhook_name}' (delivery {delivery.id})" + ) + + for h in delivery.custom_headers or []: + key, value = h.get("key"), h.get("value") + if key and value: + headers[key] = value + + # Stable idempotency signal so the receiver can dedupe retried deliveries. + headers["X-Dograh-Delivery-Id"] = delivery.delivery_uuid + headers["X-Dograh-Workflow-Run-Id"] = str(delivery.workflow_run_id) + headers["X-Dograh-Delivery-Attempt"] = str(attempt) + return headers + + +async def _handle_transient_failure( + delivery: WebhookDeliveryModel, + attempt: int, + error: str, + status_code: Optional[int], +) -> None: + """Schedule a backed-off retry, or dead-letter once attempts are exhausted.""" + if attempt >= delivery.max_attempts: + await db_client.mark_webhook_delivery_dead_letter( + delivery.id, attempt, error, status_code + ) + return + + delay = _backoff_seconds(attempt) + scheduled_for = datetime.now(UTC) + timedelta(seconds=delay) + await db_client.schedule_webhook_delivery_retry( + delivery_id=delivery.id, + attempt_count=attempt, + scheduled_for=scheduled_for, + last_error=error, + last_status_code=status_code, + ) + await _enqueue_delivery(delivery.id, attempt_count=attempt, defer_by=delay) + logger.warning( + f"Webhook '{delivery.webhook_name}' delivery {delivery.id} attempt {attempt} " + f"failed ({error}); retrying in {delay}s " + f"(attempt {attempt + 1}/{delivery.max_attempts})" + ) + + +async def deliver_webhook(_ctx, delivery_id: int) -> None: + """Send one webhook delivery attempt and record the outcome. + + Concurrency-safe: the delivery is atomically *claimed* before the HTTP + request (a conditional update only one worker can win), so a duplicate + enqueue or sweeper re-injection cannot double-send. A claim that returns + nothing means another worker owns it, or it is no longer pending/due -- a + no-op. + """ + # Lease long enough to outlast a full attempt so the sweeper does not reclaim + # a delivery that is still in flight. + lease_seconds = DEFAULT_WEBHOOK_DELIVERY_CONFIG["timeout_seconds"] + 60 + delivery = await db_client.claim_webhook_delivery(delivery_id, lease_seconds) + if delivery is None: + logger.debug( + f"Webhook delivery {delivery_id} not claimable " + f"(already claimed, not pending, or not yet due); skipping" + ) + return + + set_current_run_id(str(delivery.workflow_run_id)) + attempt = delivery.attempt_count + 1 + method = (delivery.http_method or "POST").upper() + timeout = DEFAULT_WEBHOOK_DELIVERY_CONFIG["timeout_seconds"] + + try: + headers = await _build_headers(delivery, attempt) + + async with httpx.AsyncClient() as client: + if method in ("POST", "PUT", "PATCH"): + response = await client.request( + method=method, + url=delivery.endpoint_url, + json=delivery.payload, + headers=headers, + timeout=timeout, + ) + else: # GET, DELETE + response = await client.request( + method=method, + url=delivery.endpoint_url, + headers=headers, + timeout=timeout, + ) + + response.raise_for_status() + except httpx.HTTPStatusError as e: + status_code = e.response.status_code + error = f"HTTP {status_code}: {e.response.text[:200]}" + if status_code in _RETRYABLE_STATUS_CODES: + await _handle_transient_failure(delivery, attempt, error, status_code) + else: + # Permanent (auth/validation/not-found): retrying won't help. Park it. + await db_client.mark_webhook_delivery_dead_letter( + delivery.id, attempt, error, status_code + ) + return + except httpx.RequestError as e: + # Connect/read timeouts, DNS, connection resets -- the transient class that + # previously lost the webhook entirely. str(e) is often empty, so use repr. + await _handle_transient_failure(delivery, attempt, repr(e), None) + return + except Exception as e: + # Unexpected (e.g. a bug): don't loop on it, surface as dead-letter. + logger.error( + f"Webhook '{delivery.webhook_name}' delivery {delivery.id} " + f"unexpected error: {e!r}" + ) + await db_client.mark_webhook_delivery_dead_letter( + delivery.id, attempt, repr(e), None + ) + return + + # The receiver accepted the payload (2xx). Recording success must NOT be able + # to dead-letter an already-delivered webhook: if this DB write fails, log and + # leave the row claimed-but-pending so the sweeper reconciles it once the + # lease expires (the receiver dedups the re-send via X-Dograh-Delivery-Id). + try: + await db_client.mark_webhook_delivery_succeeded( + delivery.id, attempt, response.status_code + ) + logger.info( + f"Webhook '{delivery.webhook_name}' delivery {delivery.id} succeeded: " + f"{response.status_code} (attempt {attempt})" + ) + except Exception as e: + logger.error( + f"Webhook '{delivery.webhook_name}' delivery {delivery.id} was " + f"delivered ({response.status_code}) but recording success failed; " + f"leaving it for the sweeper to reconcile after the lease expires: {e!r}" + ) + + +async def sweep_webhook_deliveries(_ctx) -> None: + """Safety net: re-enqueue pending deliveries whose attempt is overdue. + + Handles ARQ jobs lost to a worker restart or Redis flush. Re-enqueuing uses the + same deterministic job id, so if the original deferred job still exists this is a + no-op; it only re-injects genuinely lost work. ``deliver_webhook`` is idempotent. + """ + page_size = 100 + after_id = 0 + total = 0 + while True: + # Re-enqueuing does not change a row's due state, so we cannot page by + # re-querying the first rows (we'd loop on the same page). Page by id + # instead to drain the whole backlog -- e.g. after a prolonged outage. + due = await db_client.get_due_webhook_deliveries( + now=datetime.now(UTC), limit=page_size, after_id=after_id + ) + if not due: + break + for delivery in due: + # A reclaim token (the current lease timestamp) gives this a fresh job + # id so it is not deduped against the original attempt's completed job + # -- otherwise a delivered-but-unrecorded row could sit until ARQ's + # result retention clears. + reclaim_token = ( + int(delivery.scheduled_for.timestamp()) if delivery.scheduled_for else 0 + ) + await _enqueue_delivery( + delivery.id, + attempt_count=delivery.attempt_count, + reclaim_token=reclaim_token, + ) + total += len(due) + after_id = due[-1].id + if len(due) < page_size: + break + + if total: + logger.info(f"Webhook delivery sweep: re-enqueued {total} due deliveries") diff --git a/api/tests/test_run_integrations_webhook.py b/api/tests/test_run_integrations_webhook.py index 326e6905..d81736f0 100644 --- a/api/tests/test_run_integrations_webhook.py +++ b/api/tests/test_run_integrations_webhook.py @@ -1,32 +1,29 @@ +from types import SimpleNamespace from unittest.mock import AsyncMock, MagicMock, patch +from uuid import uuid4 +import httpx import pytest +from api.db.models import ( + OrganizationModel, + UserModel, + WorkflowModel, + WorkflowRunModel, +) from api.services.workflow.dto import WebhookNodeData -from api.tasks.run_integrations import _execute_webhook_node +from api.tasks.run_integrations import ( + _build_webhook_payload, + _enqueue_webhook_delivery, +) +from api.tasks.webhook_delivery import deliver_webhook + +# --------------------------------------------------------------------------- +# Payload rendering (call_disposition injection) +# --------------------------------------------------------------------------- -def _mock_httpx_client(captured: dict): - """Build a patch target for httpx.AsyncClient that records the request kwargs.""" - response = MagicMock() - response.status_code = 200 - response.raise_for_status = MagicMock() - - async def _request(**kwargs): - captured.update(kwargs) - return response - - client = MagicMock() - client.request = AsyncMock(side_effect=_request) - - ctx = MagicMock() - ctx.__aenter__ = AsyncMock(return_value=client) - ctx.__aexit__ = AsyncMock(return_value=False) - return MagicMock(return_value=ctx) - - -@pytest.mark.asyncio -async def test_webhook_injects_disposition_when_absent(): +def test_build_webhook_payload_injects_disposition_when_absent(): """call_disposition is added to the payload when the template omits it.""" webhook = WebhookNodeData( name="Test Webhook", @@ -36,21 +33,12 @@ async def test_webhook_injects_disposition_when_absent(): ) render_context = {"gathered_context": {"call_disposition": "no-answer"}} - captured: dict = {} - with patch( - "api.tasks.run_integrations.httpx.AsyncClient", _mock_httpx_client(captured) - ): - ok = await _execute_webhook_node(webhook, render_context, organization_id=1) + payload = _build_webhook_payload(webhook, render_context) - assert ok is True - assert captured["json"] == { - "event": "call_done", - "call_disposition": "no-answer", - } + assert payload == {"event": "call_done", "call_disposition": "no-answer"} -@pytest.mark.asyncio -async def test_webhook_preserves_template_disposition(): +def test_build_webhook_payload_preserves_template_disposition(): """A disposition key set explicitly in the template is not overwritten.""" webhook = WebhookNodeData( name="Test Webhook", @@ -60,17 +48,12 @@ async def test_webhook_preserves_template_disposition(): ) render_context = {"gathered_context": {"call_disposition": "no-answer"}} - captured: dict = {} - with patch( - "api.tasks.run_integrations.httpx.AsyncClient", _mock_httpx_client(captured) - ): - await _execute_webhook_node(webhook, render_context, organization_id=1) + payload = _build_webhook_payload(webhook, render_context) - assert captured["json"]["call_disposition"] == "custom-from-template" + assert payload["call_disposition"] == "custom-from-template" -@pytest.mark.asyncio -async def test_webhook_injects_empty_disposition_when_context_missing(): +def test_build_webhook_payload_empty_disposition_when_context_missing(): """Missing gathered_context values fall back to an empty string, not omission.""" webhook = WebhookNodeData( name="Test Webhook", @@ -79,10 +62,446 @@ async def test_webhook_injects_empty_disposition_when_context_missing(): payload_template={}, ) - captured: dict = {} - with patch( - "api.tasks.run_integrations.httpx.AsyncClient", _mock_httpx_client(captured) - ): - await _execute_webhook_node(webhook, {}, organization_id=1) + payload = _build_webhook_payload(webhook, {}) - assert captured["json"] == {"call_disposition": ""} + assert payload == {"call_disposition": ""} + + +# --------------------------------------------------------------------------- +# Enqueue: persist a delivery row and schedule the first send +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_enqueue_webhook_delivery_persists_and_enqueues(): + created = SimpleNamespace(id=42, delivery_uuid="uuid-42") + db = MagicMock() + db.create_webhook_delivery = AsyncMock(return_value=(created, True)) + enqueue = AsyncMock() + + webhook = WebhookNodeData( + name="Final Webhook", + enabled=True, + endpoint_url="https://example.com/hook", + http_method="POST", + payload_template={"event": "call_done"}, + ) + + with ( + patch("api.tasks.run_integrations.db_client", db), + patch("api.tasks.arq.enqueue_job", enqueue), + ): + await _enqueue_webhook_delivery( + webhook_data=webhook, + render_context={"gathered_context": {"call_disposition": "user_hangup"}}, + organization_id=7, + workflow_run_id=9, + webhook_node_id="node-1", + ) + + db.create_webhook_delivery.assert_awaited_once() + kwargs = db.create_webhook_delivery.call_args.kwargs + assert kwargs["workflow_run_id"] == 9 + assert kwargs["organization_id"] == 7 + assert kwargs["endpoint_url"] == "https://example.com/hook" + assert kwargs["payload"]["call_disposition"] == "user_hangup" + assert kwargs["webhook_node_id"] == "node-1" + + enqueue.assert_awaited_once() + # Deterministic job id for the first attempt (dedup-safe). + assert enqueue.call_args.kwargs["_job_id"] == "webhook-delivery-42-0" + + +@pytest.mark.asyncio +async def test_enqueue_webhook_delivery_idempotent_does_not_reenqueue(): + # A retried run gets the existing row back (created=False) -> no second send. + existing = SimpleNamespace(id=42, delivery_uuid="uuid-42") + db = MagicMock() + db.create_webhook_delivery = AsyncMock(return_value=(existing, False)) + enqueue = AsyncMock() + + webhook = WebhookNodeData( + name="Final Webhook", + enabled=True, + endpoint_url="https://example.com/hook", + payload_template={"event": "call_done"}, + ) + + with ( + patch("api.tasks.run_integrations.db_client", db), + patch("api.tasks.arq.enqueue_job", enqueue), + ): + await _enqueue_webhook_delivery( + webhook_data=webhook, + render_context={}, + organization_id=7, + workflow_run_id=9, + webhook_node_id="node-1", + ) + + db.create_webhook_delivery.assert_awaited_once() + enqueue.assert_not_called() + + +@pytest.mark.asyncio +async def test_enqueue_webhook_delivery_drops_secret_custom_headers(): + created = SimpleNamespace(id=1, delivery_uuid="u") + db = MagicMock() + db.create_webhook_delivery = AsyncMock(return_value=(created, True)) + + webhook = WebhookNodeData( + name="Final Webhook", + enabled=True, + endpoint_url="https://example.com/hook", + payload_template={}, + custom_headers=[ + {"key": "Authorization", "value": "Bearer secret-token"}, + {"key": "X-Custom-Auth-Token", "value": "abc"}, # variant -> dropped + {"key": "X-Idempotency-Key", "value": "idem-1"}, # benign -> kept + {"key": "X-Source", "value": "dograh"}, + ], + ) + + with ( + patch("api.tasks.run_integrations.db_client", db), + patch("api.tasks.arq.enqueue_job", AsyncMock()), + ): + await _enqueue_webhook_delivery( + webhook_data=webhook, + render_context={}, + organization_id=1, + workflow_run_id=1, + webhook_node_id="n", + ) + + persisted = db.create_webhook_delivery.call_args.kwargs["custom_headers"] + keys = {h["key"] for h in persisted} + assert "Authorization" not in keys # secret dropped, not stored in plaintext + assert "X-Custom-Auth-Token" not in keys # variant secret also dropped + assert "X-Idempotency-Key" in keys # benign 'key' header NOT a false positive + assert "X-Source" in keys # non-secret header kept + + +@pytest.mark.asyncio +async def test_enqueue_webhook_delivery_skips_disabled(): + db = MagicMock() + db.create_webhook_delivery = AsyncMock() + + webhook = WebhookNodeData( + name="Disabled", + enabled=False, + endpoint_url="https://example.com/hook", + payload_template={}, + ) + + with patch("api.tasks.run_integrations.db_client", db): + await _enqueue_webhook_delivery( + webhook_data=webhook, + render_context={}, + organization_id=1, + workflow_run_id=1, + webhook_node_id="n", + ) + + db.create_webhook_delivery.assert_not_called() + + +@pytest.mark.asyncio +async def test_get_workflow_run_with_context_uses_workflow_org( + async_session, db_session +): + run_org = OrganizationModel(provider_id=f"run-org-{uuid4()}") + selected_org = OrganizationModel(provider_id=f"selected-org-{uuid4()}") + async_session.add_all([run_org, selected_org]) + await async_session.flush() + + user = UserModel( + provider_id=f"user-{uuid4()}", + selected_organization_id=selected_org.id, + ) + async_session.add(user) + await async_session.flush() + + workflow = WorkflowModel( + name="Webhook Workflow", + user_id=user.id, + organization_id=run_org.id, + workflow_definition={"nodes": [], "edges": []}, + template_context_variables={}, + ) + async_session.add(workflow) + await async_session.flush() + + workflow_run = WorkflowRunModel( + name="Webhook Run", + workflow_id=workflow.id, + mode="test", + ) + async_session.add(workflow_run) + await async_session.flush() + + _, organization_id = await db_session.get_workflow_run_with_context(workflow_run.id) + + assert organization_id == run_org.id + assert organization_id != selected_org.id + + +@pytest.mark.asyncio +async def test_create_webhook_delivery_rejects_org_mismatch(async_session, db_session): + run_org = OrganizationModel(provider_id=f"run-org-{uuid4()}") + wrong_org = OrganizationModel(provider_id=f"wrong-org-{uuid4()}") + async_session.add_all([run_org, wrong_org]) + await async_session.flush() + + user = UserModel( + provider_id=f"user-{uuid4()}", + selected_organization_id=wrong_org.id, + ) + async_session.add(user) + await async_session.flush() + + workflow = WorkflowModel( + name="Webhook Workflow", + user_id=user.id, + organization_id=run_org.id, + workflow_definition={"nodes": [], "edges": []}, + template_context_variables={}, + ) + async_session.add(workflow) + await async_session.flush() + + workflow_run = WorkflowRunModel( + name="Webhook Run", + workflow_id=workflow.id, + mode="test", + ) + async_session.add(workflow_run) + await async_session.flush() + + with pytest.raises(ValueError, match="belongs to organization"): + await db_session.create_webhook_delivery( + workflow_run_id=workflow_run.id, + organization_id=wrong_org.id, + endpoint_url="https://example.com/hook", + payload={"event": "call_done"}, + max_attempts=5, + webhook_node_id="node-1", + ) + + delivery, created = await db_session.create_webhook_delivery( + workflow_run_id=workflow_run.id, + organization_id=run_org.id, + endpoint_url="https://example.com/hook", + payload={"event": "call_done"}, + max_attempts=5, + webhook_node_id="node-1", + ) + + assert created is True + assert delivery.organization_id == run_org.id + + +# --------------------------------------------------------------------------- +# Delivery task: send, retry, dead-letter +# --------------------------------------------------------------------------- + + +def _fake_delivery(**overrides): + base = dict( + id=1, + delivery_uuid="uuid-1", + workflow_run_id=9, + organization_id=7, + webhook_name="Final Webhook", + endpoint_url="https://example.com/hook", + http_method="POST", + payload={"event": "call_done"}, + custom_headers=None, + credential_uuid=None, + status="pending", + attempt_count=0, + max_attempts=5, + ) + base.update(overrides) + return SimpleNamespace(**base) + + +def _mock_httpx(*, raise_request_error=None, status_error=None, status_code=200): + """Patch target for httpx.AsyncClient used by the delivery task.""" + response = MagicMock() + response.status_code = status_code + response.text = "body" + if status_error is not None: + response.raise_for_status = MagicMock(side_effect=status_error) + else: + response.raise_for_status = MagicMock() + + async def _request(**kwargs): + if raise_request_error is not None: + raise raise_request_error + return response + + client = MagicMock() + client.request = AsyncMock(side_effect=_request) + ctx = MagicMock() + ctx.__aenter__ = AsyncMock(return_value=client) + ctx.__aexit__ = AsyncMock(return_value=False) + return MagicMock(return_value=ctx) + + +def _delivery_db(delivery): + db = MagicMock() + # The task claims the delivery atomically before sending; a successful claim + # returns the row. + db.claim_webhook_delivery = AsyncMock(return_value=delivery) + db.get_webhook_delivery = AsyncMock(return_value=delivery) + db.get_credential_by_uuid = AsyncMock(return_value=None) + db.mark_webhook_delivery_succeeded = AsyncMock() + db.schedule_webhook_delivery_retry = AsyncMock() + db.mark_webhook_delivery_dead_letter = AsyncMock() + return db + + +@pytest.mark.asyncio +async def test_deliver_webhook_success(): + delivery = _fake_delivery() + db = _delivery_db(delivery) + + with ( + patch("api.tasks.webhook_delivery.db_client", db), + patch("api.tasks.webhook_delivery.httpx.AsyncClient", _mock_httpx()), + ): + await deliver_webhook(None, delivery.id) + + db.mark_webhook_delivery_succeeded.assert_awaited_once_with(1, 1, 200) + db.schedule_webhook_delivery_retry.assert_not_called() + db.mark_webhook_delivery_dead_letter.assert_not_called() + + +@pytest.mark.asyncio +async def test_deliver_webhook_transient_error_schedules_retry(): + delivery = _fake_delivery(attempt_count=0) + db = _delivery_db(delivery) + enqueue = AsyncMock() + + with ( + patch("api.tasks.webhook_delivery.db_client", db), + patch( + "api.tasks.webhook_delivery.httpx.AsyncClient", + _mock_httpx(raise_request_error=httpx.ConnectTimeout("timed out")), + ), + patch("api.tasks.arq.enqueue_job", enqueue), + ): + await deliver_webhook(None, delivery.id) + + db.schedule_webhook_delivery_retry.assert_awaited_once() + assert db.schedule_webhook_delivery_retry.call_args.kwargs["attempt_count"] == 1 + db.mark_webhook_delivery_dead_letter.assert_not_called() + # Re-enqueued with a deferral and the next attempt's job id. + enqueue.assert_awaited_once() + assert enqueue.call_args.kwargs["_job_id"] == "webhook-delivery-1-1" + assert enqueue.call_args.kwargs["_defer_by"] > 0 + + +@pytest.mark.asyncio +async def test_deliver_webhook_permanent_4xx_dead_letters(): + delivery = _fake_delivery() + db = _delivery_db(delivery) + resp = MagicMock(status_code=401, text="Unauthorized") + status_error = httpx.HTTPStatusError("401", request=MagicMock(), response=resp) + + with ( + patch("api.tasks.webhook_delivery.db_client", db), + patch( + "api.tasks.webhook_delivery.httpx.AsyncClient", + _mock_httpx(status_error=status_error, status_code=401), + ), + ): + await deliver_webhook(None, delivery.id) + + db.mark_webhook_delivery_dead_letter.assert_awaited_once() + db.schedule_webhook_delivery_retry.assert_not_called() + + +@pytest.mark.asyncio +async def test_deliver_webhook_retryable_5xx_schedules_retry(): + delivery = _fake_delivery() + db = _delivery_db(delivery) + enqueue = AsyncMock() + resp = MagicMock(status_code=503, text="unavailable") + status_error = httpx.HTTPStatusError("503", request=MagicMock(), response=resp) + + with ( + patch("api.tasks.webhook_delivery.db_client", db), + patch( + "api.tasks.webhook_delivery.httpx.AsyncClient", + _mock_httpx(status_error=status_error, status_code=503), + ), + patch("api.tasks.arq.enqueue_job", enqueue), + ): + await deliver_webhook(None, delivery.id) + + db.schedule_webhook_delivery_retry.assert_awaited_once() + db.mark_webhook_delivery_dead_letter.assert_not_called() + + +@pytest.mark.asyncio +async def test_deliver_webhook_exhausted_attempts_dead_letters(): + # attempt_count=4 -> this is attempt 5 == max_attempts, so no further retry. + delivery = _fake_delivery(attempt_count=4, max_attempts=5) + db = _delivery_db(delivery) + + with ( + patch("api.tasks.webhook_delivery.db_client", db), + patch( + "api.tasks.webhook_delivery.httpx.AsyncClient", + _mock_httpx(raise_request_error=httpx.ConnectError("boom")), + ), + ): + await deliver_webhook(None, delivery.id) + + db.mark_webhook_delivery_dead_letter.assert_awaited_once() + assert db.mark_webhook_delivery_dead_letter.call_args.args[1] == 5 + db.schedule_webhook_delivery_retry.assert_not_called() + + +@pytest.mark.asyncio +async def test_deliver_webhook_no_op_when_claim_fails(): + # The atomic claim returns None when the delivery is not pending/due or was + # already claimed by a concurrent worker -> no send, no double-fire. + delivery = _fake_delivery(status="succeeded") + db = _delivery_db(delivery) + db.claim_webhook_delivery = AsyncMock(return_value=None) + httpx_mock = _mock_httpx() + + with ( + patch("api.tasks.webhook_delivery.db_client", db), + patch("api.tasks.webhook_delivery.httpx.AsyncClient", httpx_mock), + ): + await deliver_webhook(None, delivery.id) + + httpx_mock.assert_not_called() + db.mark_webhook_delivery_succeeded.assert_not_called() + db.mark_webhook_delivery_dead_letter.assert_not_called() + + +@pytest.mark.asyncio +async def test_deliver_webhook_delivered_but_record_failure_does_not_dead_letter(): + # If the HTTP POST is accepted (2xx) but recording success fails (DB blip), + # the row must NOT be dead-lettered -- it stays pending for the sweeper to + # reconcile (the receiver dedups the re-send via X-Dograh-Delivery-Id). + delivery = _fake_delivery() + db = _delivery_db(delivery) + db.mark_webhook_delivery_succeeded = AsyncMock( + side_effect=RuntimeError("db connection blip") + ) + + with ( + patch("api.tasks.webhook_delivery.db_client", db), + patch("api.tasks.webhook_delivery.httpx.AsyncClient", _mock_httpx()), + ): + await deliver_webhook(None, delivery.id) + + db.mark_webhook_delivery_succeeded.assert_awaited_once() + db.mark_webhook_delivery_dead_letter.assert_not_called() + db.schedule_webhook_delivery_retry.assert_not_called()