dograh/api/tasks/arq.py

124 lines
3.5 KiB
Python
Raw Normal View History

2025-09-09 14:37:32 +05:30
"""ARQ worker configuration - setup logging before importing tasks"""
import ssl
from urllib.parse import urlparse
from api.constants import REDIS_URL
# Setup logging - this is now idempotent and safe to call multiple times
from api.logging_config import setup_logging
from api.tasks.function_names import FunctionNames
setup_logging()
2025-09-09 14:37:32 +05:30
# Now import ARQ and task dependencies
feat(webhooks): durable retrying delivery for final webhooks (#478) * feat(webhooks): durable retrying delivery for final webhooks Final webhook nodes were fired inline with a single best-effort httpx POST (run_integrations._execute_webhook_node). On a transient error the failure was swallowed at three levels, so ARQ never retried and the final call report was permanently lost -- leaving downstream receivers stuck (e.g. a CRM showing a call as still "in conversation"). Replace the one-shot POST with a durable, idempotent delivery pipeline modelled on the campaign retry pattern (persisted row + scheduled_for + bounded attempts): - New webhook_deliveries table (WebhookDeliveryModel) is the source of truth. Payload is rendered once and frozen so retries are deterministic; secrets are not stored -- the credential is referenced by uuid and re-resolved at send time. - run_integrations now persists a delivery row and enqueues deliver_webhook with a deterministic ARQ job id instead of sending inline. - deliver_webhook (new ARQ task) sends the request and: * 2xx -> succeeded * transient -> retry with capped exponential backoff (RequestError / 5xx / 408 / 425 / 429), up to max_attempts then dead_letter * permanent 4xx -> dead_letter immediately (no pointless looping) It is idempotent: a non-pending delivery is a no-op, so a duplicate enqueue or sweeper re-injection can't double-send. - sweep_webhook_deliveries cron (every 5 min) re-enqueues overdue pending deliveries so nothing is lost to a worker restart / Redis flush. - Stable X-Dograh-Delivery-Id / Workflow-Run-Id / Attempt headers let receivers dedupe retried deliveries. - enqueue_job now forwards ARQ job options (_job_id, _defer_by); failures log repr(e) so empty-message errors like ConnectTimeout are diagnosable. Config via DEFAULT_WEBHOOK_DELIVERY_CONFIG (env-overridable): max_attempts=5, base_delay=30s, max_delay=600s, timeout=30s. Tests cover payload rendering, persist+enqueue, success, transient retry, retryable 5xx, permanent 4xx dead-letter, attempt exhaustion, and idempotency. Migration verified to apply/rollback against Postgres; table/enum/indexes confirmed. * fix(webhooks): atomic claim, safe success-recording, sweep paging, migration cleanup Address review feedback on the webhook delivery pipeline: - deliver_webhook now atomically claims a delivery (conditional UPDATE that leases scheduled_for) before sending, so concurrent ARQ executions can't double-send (the prior status=='pending' read was non-atomic). - Recording success is moved out of the dead-letter try-block: if the receiver accepted the payload (2xx) but the success DB-write fails, the row is left pending for the sweeper to reconcile instead of being dead-lettered. - The sweep keyset-paginates by id so a backlog over the page size is fully drained, and logs the true re-enqueued total. - Migration downgrade drops the enum via op.execute(DROP TYPE IF EXISTS ...) instead of the deprecated op.get_bind(). * fix(webhooks): idempotent delivery creation and drop secret custom headers Address the remaining review feedback: - Add a (workflow_run_id, webhook_node_id) unique constraint and make create_webhook_delivery a get-or-create returning (delivery, created). A retried run_integrations now reuses the existing row instead of creating and sending a duplicate final webhook; only a freshly-created row is enqueued. - Stop persisting secret-looking custom headers (Authorization, X-API-Key, Cookie, ...) in plaintext on the delivery row: they are dropped with a warning pointing at the credential store (which is re-resolved securely at send time). Non-secret custom headers are unaffected. * fix(webhooks): harden idempotency key, secret-header match, sweep reclaim id Address follow-up review feedback: - webhook_node_id is now NOT NULL so a NULL can't slip past the (workflow_run_id, webhook_node_id) unique constraint and create duplicates. - Secret-header filtering matches normalized markers (auth/token/secret/cookie/ api-key/...) instead of an exact name list, catching variants like X-Custom-Auth-Token while leaving benign headers (e.g. X-Idempotency-Key). - The sweeper re-enqueues with a reclaim-specific job id (the lease timestamp) so reconciling a delivered-but-unrecorded row isn't deduped against the original attempt's already-completed ARQ job. The atomic claim still ensures at most one send. * fix(webhooks): scope delivery rows to workflow org --------- Co-authored-by: Abhishek Kumar <abhishek@a6k.me>
2026-07-02 17:14:14 +01:00
from arq import create_pool, cron
2025-09-09 14:37:32 +05:30
from arq.connections import ArqRedis, RedisSettings
parsed_url = urlparse(REDIS_URL)
# Check if we're using TLS (rediss://)
use_ssl = parsed_url.scheme == "rediss"
# Create SSL context if using rediss://
ssl_context = None
if use_ssl:
ssl_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
REDIS_SETTINGS = RedisSettings(
host=parsed_url.hostname or "localhost",
port=parsed_url.port or 6379,
password=parsed_url.password,
conn_timeout=10,
ssl=use_ssl,
ssl_ca_certs=None if not use_ssl else None,
ssl_certfile=None,
ssl_keyfile=None,
ssl_check_hostname=False if use_ssl else None,
)
from api.tasks.campaign_tasks import (
process_campaign_batch,
sync_campaign_source,
)
from api.tasks.knowledge_base_processing import process_knowledge_base_document
2025-09-09 14:37:32 +05:30
from api.tasks.run_integrations import run_integrations_post_workflow_run
feat(webhooks): durable retrying delivery for final webhooks (#478) * feat(webhooks): durable retrying delivery for final webhooks Final webhook nodes were fired inline with a single best-effort httpx POST (run_integrations._execute_webhook_node). On a transient error the failure was swallowed at three levels, so ARQ never retried and the final call report was permanently lost -- leaving downstream receivers stuck (e.g. a CRM showing a call as still "in conversation"). Replace the one-shot POST with a durable, idempotent delivery pipeline modelled on the campaign retry pattern (persisted row + scheduled_for + bounded attempts): - New webhook_deliveries table (WebhookDeliveryModel) is the source of truth. Payload is rendered once and frozen so retries are deterministic; secrets are not stored -- the credential is referenced by uuid and re-resolved at send time. - run_integrations now persists a delivery row and enqueues deliver_webhook with a deterministic ARQ job id instead of sending inline. - deliver_webhook (new ARQ task) sends the request and: * 2xx -> succeeded * transient -> retry with capped exponential backoff (RequestError / 5xx / 408 / 425 / 429), up to max_attempts then dead_letter * permanent 4xx -> dead_letter immediately (no pointless looping) It is idempotent: a non-pending delivery is a no-op, so a duplicate enqueue or sweeper re-injection can't double-send. - sweep_webhook_deliveries cron (every 5 min) re-enqueues overdue pending deliveries so nothing is lost to a worker restart / Redis flush. - Stable X-Dograh-Delivery-Id / Workflow-Run-Id / Attempt headers let receivers dedupe retried deliveries. - enqueue_job now forwards ARQ job options (_job_id, _defer_by); failures log repr(e) so empty-message errors like ConnectTimeout are diagnosable. Config via DEFAULT_WEBHOOK_DELIVERY_CONFIG (env-overridable): max_attempts=5, base_delay=30s, max_delay=600s, timeout=30s. Tests cover payload rendering, persist+enqueue, success, transient retry, retryable 5xx, permanent 4xx dead-letter, attempt exhaustion, and idempotency. Migration verified to apply/rollback against Postgres; table/enum/indexes confirmed. * fix(webhooks): atomic claim, safe success-recording, sweep paging, migration cleanup Address review feedback on the webhook delivery pipeline: - deliver_webhook now atomically claims a delivery (conditional UPDATE that leases scheduled_for) before sending, so concurrent ARQ executions can't double-send (the prior status=='pending' read was non-atomic). - Recording success is moved out of the dead-letter try-block: if the receiver accepted the payload (2xx) but the success DB-write fails, the row is left pending for the sweeper to reconcile instead of being dead-lettered. - The sweep keyset-paginates by id so a backlog over the page size is fully drained, and logs the true re-enqueued total. - Migration downgrade drops the enum via op.execute(DROP TYPE IF EXISTS ...) instead of the deprecated op.get_bind(). * fix(webhooks): idempotent delivery creation and drop secret custom headers Address the remaining review feedback: - Add a (workflow_run_id, webhook_node_id) unique constraint and make create_webhook_delivery a get-or-create returning (delivery, created). A retried run_integrations now reuses the existing row instead of creating and sending a duplicate final webhook; only a freshly-created row is enqueued. - Stop persisting secret-looking custom headers (Authorization, X-API-Key, Cookie, ...) in plaintext on the delivery row: they are dropped with a warning pointing at the credential store (which is re-resolved securely at send time). Non-secret custom headers are unaffected. * fix(webhooks): harden idempotency key, secret-header match, sweep reclaim id Address follow-up review feedback: - webhook_node_id is now NOT NULL so a NULL can't slip past the (workflow_run_id, webhook_node_id) unique constraint and create duplicates. - Secret-header filtering matches normalized markers (auth/token/secret/cookie/ api-key/...) instead of an exact name list, catching variants like X-Custom-Auth-Token while leaving benign headers (e.g. X-Idempotency-Key). - The sweeper re-enqueues with a reclaim-specific job id (the lease timestamp) so reconciling a delivered-but-unrecorded row isn't deduped against the original attempt's already-completed ARQ job. The atomic claim still ensures at most one send. * fix(webhooks): scope delivery rows to workflow org --------- Co-authored-by: Abhishek Kumar <abhishek@a6k.me>
2026-07-02 17:14:14 +01:00
from api.tasks.webhook_delivery import deliver_webhook, sweep_webhook_deliveries
from api.tasks.workflow_completion import process_workflow_completion
2025-09-09 14:37:32 +05:30
class WorkerSettings:
functions = [
run_integrations_post_workflow_run,
process_workflow_completion,
2025-09-09 14:37:32 +05:30
sync_campaign_source,
process_campaign_batch,
process_knowledge_base_document,
feat(webhooks): durable retrying delivery for final webhooks (#478) * feat(webhooks): durable retrying delivery for final webhooks Final webhook nodes were fired inline with a single best-effort httpx POST (run_integrations._execute_webhook_node). On a transient error the failure was swallowed at three levels, so ARQ never retried and the final call report was permanently lost -- leaving downstream receivers stuck (e.g. a CRM showing a call as still "in conversation"). Replace the one-shot POST with a durable, idempotent delivery pipeline modelled on the campaign retry pattern (persisted row + scheduled_for + bounded attempts): - New webhook_deliveries table (WebhookDeliveryModel) is the source of truth. Payload is rendered once and frozen so retries are deterministic; secrets are not stored -- the credential is referenced by uuid and re-resolved at send time. - run_integrations now persists a delivery row and enqueues deliver_webhook with a deterministic ARQ job id instead of sending inline. - deliver_webhook (new ARQ task) sends the request and: * 2xx -> succeeded * transient -> retry with capped exponential backoff (RequestError / 5xx / 408 / 425 / 429), up to max_attempts then dead_letter * permanent 4xx -> dead_letter immediately (no pointless looping) It is idempotent: a non-pending delivery is a no-op, so a duplicate enqueue or sweeper re-injection can't double-send. - sweep_webhook_deliveries cron (every 5 min) re-enqueues overdue pending deliveries so nothing is lost to a worker restart / Redis flush. - Stable X-Dograh-Delivery-Id / Workflow-Run-Id / Attempt headers let receivers dedupe retried deliveries. - enqueue_job now forwards ARQ job options (_job_id, _defer_by); failures log repr(e) so empty-message errors like ConnectTimeout are diagnosable. Config via DEFAULT_WEBHOOK_DELIVERY_CONFIG (env-overridable): max_attempts=5, base_delay=30s, max_delay=600s, timeout=30s. Tests cover payload rendering, persist+enqueue, success, transient retry, retryable 5xx, permanent 4xx dead-letter, attempt exhaustion, and idempotency. Migration verified to apply/rollback against Postgres; table/enum/indexes confirmed. * fix(webhooks): atomic claim, safe success-recording, sweep paging, migration cleanup Address review feedback on the webhook delivery pipeline: - deliver_webhook now atomically claims a delivery (conditional UPDATE that leases scheduled_for) before sending, so concurrent ARQ executions can't double-send (the prior status=='pending' read was non-atomic). - Recording success is moved out of the dead-letter try-block: if the receiver accepted the payload (2xx) but the success DB-write fails, the row is left pending for the sweeper to reconcile instead of being dead-lettered. - The sweep keyset-paginates by id so a backlog over the page size is fully drained, and logs the true re-enqueued total. - Migration downgrade drops the enum via op.execute(DROP TYPE IF EXISTS ...) instead of the deprecated op.get_bind(). * fix(webhooks): idempotent delivery creation and drop secret custom headers Address the remaining review feedback: - Add a (workflow_run_id, webhook_node_id) unique constraint and make create_webhook_delivery a get-or-create returning (delivery, created). A retried run_integrations now reuses the existing row instead of creating and sending a duplicate final webhook; only a freshly-created row is enqueued. - Stop persisting secret-looking custom headers (Authorization, X-API-Key, Cookie, ...) in plaintext on the delivery row: they are dropped with a warning pointing at the credential store (which is re-resolved securely at send time). Non-secret custom headers are unaffected. * fix(webhooks): harden idempotency key, secret-header match, sweep reclaim id Address follow-up review feedback: - webhook_node_id is now NOT NULL so a NULL can't slip past the (workflow_run_id, webhook_node_id) unique constraint and create duplicates. - Secret-header filtering matches normalized markers (auth/token/secret/cookie/ api-key/...) instead of an exact name list, catching variants like X-Custom-Auth-Token while leaving benign headers (e.g. X-Idempotency-Key). - The sweeper re-enqueues with a reclaim-specific job id (the lease timestamp) so reconciling a delivered-but-unrecorded row isn't deduped against the original attempt's already-completed ARQ job. The atomic claim still ensures at most one send. * fix(webhooks): scope delivery rows to workflow org --------- Co-authored-by: Abhishek Kumar <abhishek@a6k.me>
2026-07-02 17:14:14 +01:00
deliver_webhook,
]
cron_jobs = [
# Safety net for webhook deliveries whose ARQ job was lost (worker
# restart / Redis flush): re-enqueue any pending delivery that is overdue.
cron(
sweep_webhook_deliveries,
minute=set(range(0, 60, 5)),
second=0,
run_at_startup=True,
),
2025-09-09 14:37:32 +05:30
]
redis_settings = REDIS_SETTINGS
max_jobs = 10
LOG_CONFIG = {
"version": 1,
"disable_existing_loggers": False,
# --- Handlers ---
"handlers": {
"console": { # everything goes to stdout
"class": "logging.StreamHandler",
"stream": "ext://sys.stdout",
"level": "WARNING", # only WARNING and above
"formatter": "simple",
},
},
# --- Formatters (optional) ---
"formatters": {
"simple": {
"format": "%(asctime)s | %(levelname)s | %(name)s | %(message)s",
},
},
# --- Root logger ---
"root": {
"handlers": ["console"],
"level": "WARNING",
},
# --- Optionally silence Arq itself explicitly ---
"loggers": {
"arq": { # arq.* loggers
"level": "WARNING",
"handlers": ["console"],
"propagate": False,
},
},
}
_redis_pool: ArqRedis | None = None
async def get_arq_redis() -> ArqRedis:
global _redis_pool
if _redis_pool is None:
_redis_pool = await create_pool(REDIS_SETTINGS)
return _redis_pool
feat(webhooks): durable retrying delivery for final webhooks (#478) * feat(webhooks): durable retrying delivery for final webhooks Final webhook nodes were fired inline with a single best-effort httpx POST (run_integrations._execute_webhook_node). On a transient error the failure was swallowed at three levels, so ARQ never retried and the final call report was permanently lost -- leaving downstream receivers stuck (e.g. a CRM showing a call as still "in conversation"). Replace the one-shot POST with a durable, idempotent delivery pipeline modelled on the campaign retry pattern (persisted row + scheduled_for + bounded attempts): - New webhook_deliveries table (WebhookDeliveryModel) is the source of truth. Payload is rendered once and frozen so retries are deterministic; secrets are not stored -- the credential is referenced by uuid and re-resolved at send time. - run_integrations now persists a delivery row and enqueues deliver_webhook with a deterministic ARQ job id instead of sending inline. - deliver_webhook (new ARQ task) sends the request and: * 2xx -> succeeded * transient -> retry with capped exponential backoff (RequestError / 5xx / 408 / 425 / 429), up to max_attempts then dead_letter * permanent 4xx -> dead_letter immediately (no pointless looping) It is idempotent: a non-pending delivery is a no-op, so a duplicate enqueue or sweeper re-injection can't double-send. - sweep_webhook_deliveries cron (every 5 min) re-enqueues overdue pending deliveries so nothing is lost to a worker restart / Redis flush. - Stable X-Dograh-Delivery-Id / Workflow-Run-Id / Attempt headers let receivers dedupe retried deliveries. - enqueue_job now forwards ARQ job options (_job_id, _defer_by); failures log repr(e) so empty-message errors like ConnectTimeout are diagnosable. Config via DEFAULT_WEBHOOK_DELIVERY_CONFIG (env-overridable): max_attempts=5, base_delay=30s, max_delay=600s, timeout=30s. Tests cover payload rendering, persist+enqueue, success, transient retry, retryable 5xx, permanent 4xx dead-letter, attempt exhaustion, and idempotency. Migration verified to apply/rollback against Postgres; table/enum/indexes confirmed. * fix(webhooks): atomic claim, safe success-recording, sweep paging, migration cleanup Address review feedback on the webhook delivery pipeline: - deliver_webhook now atomically claims a delivery (conditional UPDATE that leases scheduled_for) before sending, so concurrent ARQ executions can't double-send (the prior status=='pending' read was non-atomic). - Recording success is moved out of the dead-letter try-block: if the receiver accepted the payload (2xx) but the success DB-write fails, the row is left pending for the sweeper to reconcile instead of being dead-lettered. - The sweep keyset-paginates by id so a backlog over the page size is fully drained, and logs the true re-enqueued total. - Migration downgrade drops the enum via op.execute(DROP TYPE IF EXISTS ...) instead of the deprecated op.get_bind(). * fix(webhooks): idempotent delivery creation and drop secret custom headers Address the remaining review feedback: - Add a (workflow_run_id, webhook_node_id) unique constraint and make create_webhook_delivery a get-or-create returning (delivery, created). A retried run_integrations now reuses the existing row instead of creating and sending a duplicate final webhook; only a freshly-created row is enqueued. - Stop persisting secret-looking custom headers (Authorization, X-API-Key, Cookie, ...) in plaintext on the delivery row: they are dropped with a warning pointing at the credential store (which is re-resolved securely at send time). Non-secret custom headers are unaffected. * fix(webhooks): harden idempotency key, secret-header match, sweep reclaim id Address follow-up review feedback: - webhook_node_id is now NOT NULL so a NULL can't slip past the (workflow_run_id, webhook_node_id) unique constraint and create duplicates. - Secret-header filtering matches normalized markers (auth/token/secret/cookie/ api-key/...) instead of an exact name list, catching variants like X-Custom-Auth-Token while leaving benign headers (e.g. X-Idempotency-Key). - The sweeper re-enqueues with a reclaim-specific job id (the lease timestamp) so reconciling a delivered-but-unrecorded row isn't deduped against the original attempt's already-completed ARQ job. The atomic claim still ensures at most one send. * fix(webhooks): scope delivery rows to workflow org --------- Co-authored-by: Abhishek Kumar <abhishek@a6k.me>
2026-07-02 17:14:14 +01:00
async def enqueue_job(function_name: FunctionNames, *args, **kwargs):
2025-09-09 14:37:32 +05:30
redis = await get_arq_redis()
feat(webhooks): durable retrying delivery for final webhooks (#478) * feat(webhooks): durable retrying delivery for final webhooks Final webhook nodes were fired inline with a single best-effort httpx POST (run_integrations._execute_webhook_node). On a transient error the failure was swallowed at three levels, so ARQ never retried and the final call report was permanently lost -- leaving downstream receivers stuck (e.g. a CRM showing a call as still "in conversation"). Replace the one-shot POST with a durable, idempotent delivery pipeline modelled on the campaign retry pattern (persisted row + scheduled_for + bounded attempts): - New webhook_deliveries table (WebhookDeliveryModel) is the source of truth. Payload is rendered once and frozen so retries are deterministic; secrets are not stored -- the credential is referenced by uuid and re-resolved at send time. - run_integrations now persists a delivery row and enqueues deliver_webhook with a deterministic ARQ job id instead of sending inline. - deliver_webhook (new ARQ task) sends the request and: * 2xx -> succeeded * transient -> retry with capped exponential backoff (RequestError / 5xx / 408 / 425 / 429), up to max_attempts then dead_letter * permanent 4xx -> dead_letter immediately (no pointless looping) It is idempotent: a non-pending delivery is a no-op, so a duplicate enqueue or sweeper re-injection can't double-send. - sweep_webhook_deliveries cron (every 5 min) re-enqueues overdue pending deliveries so nothing is lost to a worker restart / Redis flush. - Stable X-Dograh-Delivery-Id / Workflow-Run-Id / Attempt headers let receivers dedupe retried deliveries. - enqueue_job now forwards ARQ job options (_job_id, _defer_by); failures log repr(e) so empty-message errors like ConnectTimeout are diagnosable. Config via DEFAULT_WEBHOOK_DELIVERY_CONFIG (env-overridable): max_attempts=5, base_delay=30s, max_delay=600s, timeout=30s. Tests cover payload rendering, persist+enqueue, success, transient retry, retryable 5xx, permanent 4xx dead-letter, attempt exhaustion, and idempotency. Migration verified to apply/rollback against Postgres; table/enum/indexes confirmed. * fix(webhooks): atomic claim, safe success-recording, sweep paging, migration cleanup Address review feedback on the webhook delivery pipeline: - deliver_webhook now atomically claims a delivery (conditional UPDATE that leases scheduled_for) before sending, so concurrent ARQ executions can't double-send (the prior status=='pending' read was non-atomic). - Recording success is moved out of the dead-letter try-block: if the receiver accepted the payload (2xx) but the success DB-write fails, the row is left pending for the sweeper to reconcile instead of being dead-lettered. - The sweep keyset-paginates by id so a backlog over the page size is fully drained, and logs the true re-enqueued total. - Migration downgrade drops the enum via op.execute(DROP TYPE IF EXISTS ...) instead of the deprecated op.get_bind(). * fix(webhooks): idempotent delivery creation and drop secret custom headers Address the remaining review feedback: - Add a (workflow_run_id, webhook_node_id) unique constraint and make create_webhook_delivery a get-or-create returning (delivery, created). A retried run_integrations now reuses the existing row instead of creating and sending a duplicate final webhook; only a freshly-created row is enqueued. - Stop persisting secret-looking custom headers (Authorization, X-API-Key, Cookie, ...) in plaintext on the delivery row: they are dropped with a warning pointing at the credential store (which is re-resolved securely at send time). Non-secret custom headers are unaffected. * fix(webhooks): harden idempotency key, secret-header match, sweep reclaim id Address follow-up review feedback: - webhook_node_id is now NOT NULL so a NULL can't slip past the (workflow_run_id, webhook_node_id) unique constraint and create duplicates. - Secret-header filtering matches normalized markers (auth/token/secret/cookie/ api-key/...) instead of an exact name list, catching variants like X-Custom-Auth-Token while leaving benign headers (e.g. X-Idempotency-Key). - The sweeper re-enqueues with a reclaim-specific job id (the lease timestamp) so reconciling a delivered-but-unrecorded row isn't deduped against the original attempt's already-completed ARQ job. The atomic claim still ensures at most one send. * fix(webhooks): scope delivery rows to workflow org --------- Co-authored-by: Abhishek Kumar <abhishek@a6k.me>
2026-07-02 17:14:14 +01:00
# kwargs forwards ARQ job options (e.g. _job_id, _defer_by) used for
# deterministic, backed-off webhook delivery retries.
return await redis.enqueue_job(function_name, *args, **kwargs)